test_case_evaluator.py•37.3 kB
"""
AI 测试用例评估器
本脚本提供了一个完整的解决方案,用于从 Excel 文件中读取测试用例,
利用大型语言模型(LLM)进行自动化评估,并生成详细的改进建议。
它集成了动态token管理、自定义评估规则和需求单知识库,以实现高效、准确的评估流程。
核心功能:
1. **数据处理**:从 Excel 文件读取测试用例,并将其转换为结构化的 JSON 格式。
2. **Token管理**:使用统一的 TokenCounter(来自 common_utils),支持精确的 Token 计数器
(优先使用 `transformers` 库),并根据动态计算的提示词长度自动将大量测试用例分割成
适合模型上下文窗口的批次。
3. **动态提示词**:基于 `test_case_rules_customer.py` 中的自定义规则(如标题长度、步骤数)
构建评估提示词,确保评估标准的一致性和灵活性。
4. **知识库集成**:关联 `test_case_require_list_knowledge_base.py` 中的需求单知识库,
在评估时为 LLM 提供相关的需求背景信息,提高评估的准确性。
5. **异步评估**:使用 `aiohttp` 进行异步 API 调用,并行处理多个评估请求,提高处理效率。
6. **结果解析**:能够解析 LLM 返回的 Markdown 表格格式的评估结果,并将其转换为结构化的
JSON 数据,便于后续分析和存储。
类与模块说明:
- `TestCaseProcessor`: 处理测试用例数据。主要功能是将 Excel 文件转换为 JSON 格式,
并进行必要的字段映射和数据清理。
- `TestCaseEvaluator`: 核心评估器类。
- `__init__`: 初始化评估器,加载规则配置和需求单知识库,并根据模型上下文限制计算
token 分配策略。
- `_build_dynamic_prompt_template`: 根据规则动态构建发送给 LLM 的提示词模板。
- `estimate_batch_tokens`: 估算一个批次的测试用例在组合成单个请求后所需的 token 总量。
- `split_test_cases_by_tokens`: 根据 token 阈值将所有用例分割成多个批次。
- `evaluate_batch`: 对单个批次的测试用例执行 AI 评估,发送异步请求并获取结果。
- `parse_evaluation_result`: 解析 AI 返回的 Markdown 格式的评估结果。
- `evaluate_test_cases`: 编排整个评估流程,从批次分割到结果汇总。
处理流程:
1. `main_process` 函数启动处理流程。
2. `TestCaseProcessor` 读取 Excel 文件并转换为 JSON。
3. `TestCaseEvaluator` 加载所有测试用例。
4. `split_test_cases_by_tokens` 方法根据 `token_threshold` 将用例分割成多个批次。
5. 对于每个批次,`evaluate_batch` 方法构建一个包含该批次所有用例的提示词,并异步调用
LLM API。
6. `parse_evaluation_result` 方法解析返回的 Markdown 表格,提取每个用例的评分和建议。
7. 所有批次处理完成后,结果被汇总并保存到 `Proceed_TestCase_...json` 文件中。
注意:
- TokenCounter 类已迁移至 common_utils.py 统一管理,本脚本通过 get_token_counter() 获取实例
- 集成了统一的配置管理、文件管理和API管理接口
"""
import re
import json
import asyncio
import aiohttp
import pandas as pd
import sys
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
# 添加项目根目录和mcp_tools目录到路径
current_dir = Path(__file__).parent
project_root = current_dir.parent
mcp_tools_path = project_root / "mcp_tools"
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
if str(mcp_tools_path) not in sys.path:
sys.path.insert(0, str(mcp_tools_path))
# 导入公共工具
from common_utils import (
get_config,
get_api_manager,
get_file_manager,
get_token_counter,
BatchingUtils,
MarkdownUtils,
TokenBudgetUtils,
)
from test_case_rules_customer import get_test_case_rules
from test_case_require_list_knowledge_base import RequirementKnowledgeBase
class TestCaseProcessor:
"""Excel测试用例处理器"""
def __init__(self):
self.config = get_config()
self.file_manager = get_file_manager()
def excel_to_json(self, excel_file_path: str, json_file_path: str) -> List[Dict[str, Any]]:
"""
将Excel文件转换为JSON格式
参数:
excel_file_path: Excel文件路径
json_file_path: 输出JSON文件路径
返回:
转换后的数据列表
"""
try:
column_mapping = {
'用例ID': 'test_case_id',
'UUID': 'UUID',
'用例标题': 'test_case_title',
'用例目录': 'test_case_menu',
'是否自动化': 'is_automatic',
'等级': 'level',
'前置条件': 'prerequisites',
'步骤描述类型': 'step_description_type',
'步骤描述': 'step_description',
'预期结果': 'expected_result',
}
json_data = self.file_manager.read_excel_with_mapping(excel_file_path, column_mapping)
with open(json_file_path, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
print(f"成功转换 {len(json_data)} 条测试用例数据到 {json_file_path}")
return json_data
except Exception as e:
raise RuntimeError(f"Excel转JSON失败: {e}")
class TestCaseEvaluator:
"""测试用例AI评估器"""
def __init__(self, max_context_tokens: int = 8000):
self.config = get_config()
self.api_manager = get_api_manager()
self.file_manager = get_file_manager()
self.token_counter = get_token_counter()
# 初始化需求单知识库
self.requirement_kb = RequirementKnowledgeBase()
# 加载自定义规则配置
self.rules_config = get_test_case_rules()
print(f"已加载测试用例评估规则配置:")
print(f" - 标题长度≤{self.rules_config['title_max_length']}字符")
print(f" - 步骤数≤{self.rules_config['max_steps']}步")
print(f" - 优先级占比配置:")
for priority, ratio in self.rules_config['priority_ratios'].items():
print(f" * {priority}: {ratio['min']}-{ratio['max']}%")
print(f"需求单知识库: 已加载 {len(self.requirement_kb.requirements)} 个需求单")
# 上下文管理与预算(新):
# max_context_tokens 由以下部分组成:
# 20%预留tokens + 需求单tokens + 模板提示词tokens + 剩余可用tokens
# 剩余可用tokens = 待处理用例tokens + 返回表格tokens(约等于待处理用例tokens)
# 即:reserve + template + requirements + 2 * test_cases_tokens <= max_context
self.max_context_tokens = max_context_tokens
# 构建动态评估提示词模板
self.evaluation_prompt_template = self._build_dynamic_prompt_template()
# 计算提示词模板的基础token数量(不包含动态需求单与测试用例数据)
template_with_rules = self.evaluation_prompt_template
template_base = (
template_with_rules
.replace('{test_case_id}', '')
.replace('{test_case_title}', '')
.replace('{prerequisites}', '')
.replace('{step_description}', '')
.replace('{expected_result}', '')
.replace('{test_cases_json}', '')
.replace('{requirement_info}', '')
)
self.template_base_tokens = self.token_counter.count_tokens(template_base)
# 需求单信息替换后单独计算tokens(全局一次,批次内复用)
self.requirement_info_text = self.requirement_kb.get_requirements_for_evaluation()
self.requirement_tokens = self.token_counter.count_tokens(self.requirement_info_text)
# 20% 预留
self.reserve_tokens = max(64, int(self.max_context_tokens * 0.20))
# 动态可用预算(用于 请求用例 + 响应表格)
self.dynamic_pair_budget = self.max_context_tokens - (
self.reserve_tokens + self.template_base_tokens + self.requirement_tokens
)
if self.dynamic_pair_budget <= 0:
# 模板或需求单过大,给出降级提示并尽量留出最小对话空间
print(
f"警告:模板或需求单内容过大(模板={self.template_base_tokens}, 需求={self.requirement_tokens}), "
f"已超出窗口预算。将采用最小对称预算进行计算。"
)
self.dynamic_pair_budget = max(256, int(self.max_context_tokens * 0.20))
# 单边预算(仅请求用例),在“响应≈2×请求”的假设下:
# reserve + template + requirements + (request + response)
# ≈ reserve + template + requirements + (request + 2×request)
# ≈ 固定项 + 3×request <= max_context
# 因此 request 的安全上限 ≈ dynamic_pair_budget / 3
self.single_side_budget = max(64, self.dynamic_pair_budget // 3)
print(
"Token预算配置: "
f"总上下文={max_context_tokens}, 预留={self.reserve_tokens}, 模板={self.template_base_tokens}, 需求={self.requirement_tokens}, "
f"可用(请求+响应)={self.dynamic_pair_budget}, 单边(请求)≈{self.single_side_budget}(响应≈2×请求JSON)"
)
def _build_dynamic_prompt_template(self) -> str:
"""
根据自定义规则配置构建动态提示词模板
返回:
str: 动态生成的提示词模板
"""
title_max_length = self.rules_config['title_max_length']
max_steps = self.rules_config['max_steps']
priority_ratios = self.rules_config['priority_ratios']
# 构建优先级占比说明
p0_range = f"{priority_ratios['P0']['min']}%~{priority_ratios['P0']['max']}%"
p1_range = f"{priority_ratios['P1']['min']}%~{priority_ratios['P1']['max']}%"
p2_range = f"{priority_ratios['P2']['min']}%~{priority_ratios['P2']['max']}%"
template = f"""你需要为一批业务需求用例进行打分与评估。请为以下测试用例分别生成评估表格。每个用例应该有自己独立的表格。
## 重要提示:
1. 请为每个用例生成一个完整的表格
2. 每个表格都应该包含表头 "| 用例信息 | 分数 | 改进建议 |"
3. 每个表格之间用空行分隔
4. 严格按照提供的表格格式输出
5. **除此以外不需要任何分析或解释**
6. **必须从测试用例JSON数据中提取真实的字段内容,不要使用任何占位符**
## 评分规则:
| 用例要素 | 是否必须 | 要求 |
| ---- | ---- | -------------------------------------------------------------------------------------------------------------------------- |
| 关联需求 | 是 | 1. 用例应当与需求单中的一条或多条有关 |
| 用例标题 | 是 | 1. 标题长度不超过 {title_max_length} 字符,且描述测试功能点<br>2. 语言清晰,简洁易懂<br>3. 避免在用例评分中出现步骤 |
| 前置条件 | 否 | 1. 列出所有前提:账号类型,灰度等<br>2. 避免过度复杂:每个条件不超过 2 项描述,必要时分点列出 |
| 测试步骤 | 是 | 1. 步骤用编号链接:使用 1、2、3... 结构,每步描述一个动作<br>2. 步骤具体化:包含用户操作、输入值和上下文说明<br>3. 步骤数合理:不超过 {max_steps} 步,否则需分解为多个用例。<br>4. 避免步骤中带有检查点 |
| 预期结果 | 是 | 1. 描述明确结果:明确的结果,确切的检查<br>2. 避免模棱两可的词(如功能正常,跟现网一致等) |
| 优先级 | 是 | 1. 等级划分标准:采用 P0-P2<br>2. 明确标注:每个用例必须有优先级字段。<br>3. 优先级比例:P0占比应在 {p0_range} 之间,P1占比 {p1_range},P2占比 {p2_range} |
* 所有评分满分为 10 分,未提供必须提供的字段给0分,每有一点要求未满足则酌情扣1-2分,最低 0 分。
* 未提供前置条件时,给 -1 分,便于后期横向对比。
* 若用例与需求单中任何一条需求都无关,则给0分。若与某一条需求相关程度较高,则根据相关程度给出分数。若同时与多条需求相关,则根据相关程度综合评分。
* 对低于 10 分的要素给出准确的建议。给出的每一条建议都应当具体明确,且不超过100字。
## 关联需求单信息:
{{requirement_info}}
## 请为每个测试用例生成如下格式的表格
| 用例信息 | 分数 | 改进建议 |
| ----------------------------------------- | --- | ------------------- |
| **用例ID**<br>[从JSON中提取真实的test_case_id] | - | - |
| **用例标题**<br>[从JSON中提取真实的test_case_title] | 8 | 改为“验证错误密码登录的失败提示” |
| **前置条件**<br>[从JSON中提取真实的prerequisites] | 8 | 补充系统版本要求 |
| **测试步骤**<br>[从JSON中提取真实的step_description] | 6 | 步骤3增加“等待3秒” |
| **预期结果**<br>[从JSON中提取真实的expected_result] | 7 | 明确提示位置(如:输入框下方红色文字) |
表格中的“改进建议”内容仅供参考,提供建议时需要尽可能考虑到业务的方方面面,而不只是局限于表格模板。
## 测试用例JSON数据
{{test_cases_json}}
"""
return template
def estimate_batch_tokens(self, test_cases: List[Dict[str, Any]]) -> int:
"""
估算一批“仅测试用例数据部分”的token数量(不含模板与需求单)。
参数:
test_cases: 测试用例列表
返回:
仅测试用例JSON文本的token估算
"""
if not test_cases:
return 0
# 仅对测试用例JSON进行计数,模板与需求单已单独预计算
test_cases_json = json.dumps(test_cases, ensure_ascii=False, indent=2)
return self.token_counter.count_tokens(test_cases_json)
def split_test_cases_by_tokens(self, test_cases: List[Dict[str, Any]],
start_index: int = 0) -> Tuple[List[Dict[str, Any]], int]:
"""
根据token限制分割测试用例
参数:
test_cases: 全部测试用例列表
start_index: 开始索引
返回:
(当前批次的测试用例, 下一批次的开始索引)
"""
# 基于“响应≈2×请求”的单边预算进行分批(仅统计请求侧的用例JSON部分)
batch, next_index, current_tokens = BatchingUtils.split_by_token_budget(
test_cases,
estimate_tokens_fn=self.estimate_batch_tokens,
token_threshold=self.single_side_budget,
start_index=start_index,
)
# 在“响应≈2×请求”的假设下,响应上限按“2×用例JSON tokens”取值
request_tokens_est = current_tokens # 仅用例JSON部分
# 再次基于全局预算做一次安全裁剪,避免边界超限
max_pairs_left = max(
0,
self.max_context_tokens
- (self.reserve_tokens + self.template_base_tokens + self.requirement_tokens)
)
allowed_response_by_pairs = max_pairs_left - request_tokens_est
theoretical_response_cap = request_tokens_est * 2
# 受理论上限与剩余额度共同限制
response_tokens_cap = max(64, min(theoretical_response_cap, allowed_response_by_pairs))
total_estimated_tokens = (
self.reserve_tokens
+ self.template_base_tokens
+ self.requirement_tokens
+ request_tokens_est
+ response_tokens_cap
)
# 显示当前批次包含的测试用例ID与token预算
batch_ids = [case.get('test_case_id', 'N/A') for case in batch]
print(
f"当前批次包含 {len(batch)} 个测试用例,预计tokens: 模板={self.template_base_tokens}, 需求={self.requirement_tokens}, "
f"请求≈{request_tokens_est}, 响应上限≈{response_tokens_cap}, 预留={self.reserve_tokens}, "
f"合计≈{total_estimated_tokens}(窗口={self.max_context_tokens})"
)
print(f"批次包含的用例ID: {', '.join(batch_ids)}")
return batch, next_index
async def evaluate_batch(self, test_cases: List[Dict[str, Any]],
session: aiohttp.ClientSession) -> str:
"""
评估一批测试用例
参数:
test_cases: 测试用例列表
session: HTTP会话
返回:
AI评估结果
"""
# 构建批量提示词 - 一次性处理多个测试用例
test_cases_json = json.dumps(test_cases, ensure_ascii=False, indent=2)
# 使用全局已缓存的需求单信息
requirement_info = self.requirement_info_text
# 构建最终提示词
final_prompt = self.evaluation_prompt_template.format(
requirement_info=requirement_info,
test_cases_json=test_cases_json
)
# 运行期注入校验与简短预览,帮助定位“未注入需求单”的问题
try:
assert "需求单信息" in final_prompt, "提示词中缺少需求单信息标题"
if requirement_info.strip():
# 取需求单首行进行存在性检查
first_line = requirement_info.strip().splitlines()[0]
if first_line:
assert first_line in final_prompt, "需求单文本可能未正确注入提示词"
except AssertionError as _e:
print(f"[警告] 需求单注入校验失败: {_e}")
# 调试预览(只展示需求单与用例片段,避免日志过长)
try:
req_preview = requirement_info[:200].replace('\n', ' ')
cases_preview = test_cases_json[:200].replace('\n', ' ')
print(f"\n需求单片段: \n{req_preview}...")
print(f"用例JSON片段: \n{cases_preview}...\n")
except Exception:
pass
# 仅用例JSON的tokens(用于计算响应上限)
request_cases_tokens = self.token_counter.count_tokens(test_cases_json)
# 计算响应上限:取“2×用例”的理论值,并受总体预算限制
theoretical_response_cap = request_cases_tokens * 2
# 总体剩余额度(去掉预留、模板、需求及请求全部提示词后的剩余)
total_prompt_tokens = self.token_counter.count_tokens(final_prompt)
leftover_budget = max(0, self.max_context_tokens - self.reserve_tokens - total_prompt_tokens)
dynamic_response_tokens = max(64, min(theoretical_response_cap, leftover_budget))
# print(
# f"批量处理 {len(test_cases)} 个用例: "
# f"模板={self.template_base_tokens}, 需求={self.requirement_tokens}, 用例JSON≈{request_cases_tokens}, "
# f"完整请求≈{total_prompt_tokens}, 响应tokens限制≈{dynamic_response_tokens}(响应≈2×请求JSON)"
# )
print("正在调用AI进行评估...")
# 调用AI API(使用默认配置,支持环境变量自动检测)
result = await self.api_manager.call_llm(
prompt=final_prompt,
session=session,
max_tokens=self.max_context_tokens
)
print("AI评估完成,开始解析结果...")
return result
def parse_evaluation_result(self, ai_response: str) -> List[Dict[str, Any]]:
"""
解析AI评估结果,将Markdown表格转换为JSON
支持解析多个表格(当AI返回多个表格时)
参数:
ai_response: AI返回的Markdown表格
返回:
解析后的评估结果列表
"""
evaluations: List[Dict[str, Any]] = []
tables = MarkdownUtils.parse_markdown_tables(ai_response)
print(f"找到 {len(tables)} 个表格")
print("开始解析表格数据...")
if not tables:
print("未找到有效的表格数据")
return evaluations
# 将通用表格转换为业务结构
for idx, tbl in enumerate(tables):
headers = tbl.get("headers", [])
rows = tbl.get("rows", [])
print(f"解析第 {idx + 1} 个表格,包含 {len(rows)} 行")
current_case: Optional[Dict[str, Any]] = None
case_id: Optional[str] = None
# 逐行处理
for row in rows:
# 预期列:用例信息 | 分数 | 改进建议
if len(row) < 3:
continue
field_info, score, suggestion = row[0].strip(), row[1].strip(), row[2].strip()
if '**用例ID**' in field_info or 'ID' in field_info:
# 提取用例ID
if '<br>' in field_info:
id_part = field_info.split('<br>')[-1].strip()
else:
id_match = re.search(r'\d{8,}', field_info)
id_part = id_match.group() if id_match else field_info.replace('**用例ID**', '').strip()
case_id = id_part
print(f" 正在解析用例ID: {case_id}")
if current_case:
evaluations.append(current_case)
current_case = {'test_case_id': case_id, 'evaluations': []}
continue
if current_case and '**' in field_info:
if '<br>' in field_info:
field_parts = field_info.split('<br>')
field_name = field_parts[0].replace('**', '').strip()
field_content = '<br>'.join(field_parts[1:]).strip() if len(field_parts) > 1 else ''
field_content = field_content.replace('<br>', '\n')
else:
parts = field_info.split('**')
if len(parts) >= 3:
field_name = parts[1].strip()
field_content = '**'.join(parts[2:]).strip()
else:
field_name = field_info.replace('**', '').strip()
field_content = ''
field_name = field_name.replace('*', '').strip()
field_content = field_content.replace('*', '').strip()
print(f" 解析字段: {field_name} (分数: {score if score != '-' else '无'})")
evaluation_item = {
'field': field_name,
'content': field_content,
'score': score if score != '-' else None,
'suggestion': suggestion if suggestion != '-' else None,
}
current_case['evaluations'].append(evaluation_item)
if current_case:
evaluations.append(current_case)
print(f" 完成用例解析: {current_case['test_case_id']}")
print(f"成功解析 {len(evaluations)} 个用例的评估结果")
if evaluations:
first_case = evaluations[0]
print(f"示例解析结果 - 用例ID: {first_case['test_case_id']}, 评估项数: {len(first_case['evaluations'])}")
for item in first_case['evaluations'][:2]:
print(f" - {item['field']}: 分数={item['score']}, 建议={item['suggestion']}")
return evaluations
async def evaluate_test_cases(self, test_cases: List[Dict[str, Any]],
test_batch_count: Optional[int] = None) -> List[Dict[str, Any]]:
"""
评估测试用例
参数:
test_cases: 测试用例列表
test_batch_count: 测试数据批次,1表示只处理第一批
返回:
评估结果列表
"""
all_evaluations = []
current_index = 0
batch_number = 1
async with aiohttp.ClientSession() as session:
while current_index < len(test_cases):
# 如果设置了测试批次限制,检查是否超过
if test_batch_count and batch_number > test_batch_count:
print(f"达到测试批次限制 ({test_batch_count}),停止处理")
break
# 显示总体进度
remaining_cases = len(test_cases) - current_index
progress_percent = (current_index / len(test_cases)) * 100
print(f"\n总体进度: {current_index}/{len(test_cases)} ({progress_percent:.1f}%), 剩余 {remaining_cases} 个用例")
print(f"开始处理第 {batch_number} 批次...")
# 分割当前批次
batch_cases, next_index = self.split_test_cases_by_tokens(
test_cases, current_index
)
if not batch_cases:
print("没有更多测试用例可处理")
break
try:
# 评估当前批次
ai_result = await self.evaluate_batch(batch_cases, session)
print(f"AI返回结果长度: {len(ai_result)}")
print(f"AI返回结果字符预览: ================================================================================")
print(f"\n{ai_result}\n")
print("====================================================================================================")
# 解析结果
batch_evaluations = self.parse_evaluation_result(ai_result)
all_evaluations.extend(batch_evaluations)
# 显示本批次处理的用例ID
processed_ids = [eval_result['test_case_id'] for eval_result in batch_evaluations]
print(f"第 {batch_number} 批次处理完成,评估了 {len(batch_evaluations)} 个用例")
print(f"已完成评估的用例ID: {', '.join(processed_ids)}")
# 如果是测试模式且第一批次完成,询问是否继续
if test_batch_count == 1 and batch_number == 1:
print(f"\n第一批次测试完成,评估结果预览:")
if batch_evaluations:
first_eval = batch_evaluations[0]
print(f"用例ID: {first_eval['test_case_id']}")
print(f"评估项数量: {len(first_eval['evaluations'])}")
# 显示第一个评估项的详细信息
if first_eval['evaluations']:
first_item = first_eval['evaluations'][0]
print(f"示例评估 - {first_item['field']}: 分数={first_item.get('score', '无')}, 建议={first_item.get('suggestion', '无')}")
else:
print("解析评估结果失败,可能需要调整解析逻辑")
print(f"AI原始返回: {ai_result[:500]}...")
print("\n如需处理更多批次,请修改 test_batch_count 参数")
break
except Exception as e:
print(f"第 {batch_number} 批次处理失败: {str(e)}")
if test_batch_count == 1:
print("测试批次失败,请检查API配置和网络连接")
break
else:
print("跳过当前批次,继续处理下一批次")
current_index = next_index
batch_number += 1
# 批次间延迟,避免API限制
await asyncio.sleep(1)
return all_evaluations
async def main_process(test_batch_count: Optional[int] = None):
"""
主处理流程
参数:
test_batch_count: 测试数据批次限制,None表示处理所有数据
"""
config = get_config()
processor = TestCaseProcessor()
evaluator = TestCaseEvaluator(max_context_tokens=8000)
# 获取规则配置中的优先级比例要求
rules_config = get_test_case_rules()
priority_ratios = rules_config['priority_ratios']
# 文件选择:自动扫描 local_data 下的 .xlsx,并让用户交互选择
xlsx_files = sorted([p for p in config.local_data_path.glob("*.xlsx") if p.is_file()])
if not xlsx_files:
raise FileNotFoundError(f"未在目录中找到任何 .xlsx 文件: {config.local_data_path}")
print("\n检测到以下可处理的 Excel 文件:")
for idx, p in enumerate(xlsx_files, start=1):
print(f" {idx}. {p.name}")
selected_index: Optional[int] = None
while selected_index is None:
choice = input("\n请输入要处理的文件编号 (数字): ").strip()
if not choice.isdigit():
print("输入无效,请输入数字编号。")
continue
num = int(choice)
if num < 1 or num > len(xlsx_files):
print(f"编号超出范围,请输入 1~{len(xlsx_files)} 之间的数字。")
continue
selected_index = num - 1
excel_file = xlsx_files[selected_index]
base_name = excel_file.stem # 去除扩展名后的文件名
# 中间转换产物(如不存在则生成一次),仍按 <源名>.json 以便复用
json_file = config.local_data_path / f"{base_name}.json"
# 最终评估结果文件:Proceed_<用户选择的文件名>.json
result_file = config.local_data_path / f"Proceed_{base_name}.json"
print(f"\n已选择文件: {excel_file.name}")
print(f"转换JSON路径: {json_file.name}")
print(f"结果输出路径: {result_file.name}")
try:
# 步骤1: 检查Excel文件是否存在
if not excel_file.exists():
raise FileNotFoundError(f"Excel文件不存在: {excel_file}")
# 步骤2: 将Excel转换为JSON(如果JSON文件不存在)
if not json_file.exists():
print("开始转换Excel文件为JSON格式...")
test_cases = processor.excel_to_json(str(excel_file), str(json_file))
else:
print("JSON文件已存在,直接加载...")
with open(json_file, 'r', encoding='utf-8') as f:
test_cases = json.load(f)
print(f"加载了 {len(test_cases)} 条测试用例数据")
from datetime import datetime
from collections import Counter
start_time = datetime.now()
start_time_str = start_time.strftime("%Y-%m-%d %H:%M:%S")
print(f"\n开始处理时间: {start_time_str}")
# 分析测试用例的优先级分布
level_counter = Counter()
for test_case in test_cases:
level = test_case.get('level', '').strip().upper()
if level:
# 标准化优先级表示,例如P0、P1、P2
if level.startswith('P') and len(level) > 1 and level[1].isdigit():
level_counter[level] += 1
else:
level_counter['其他'] += 1
else:
level_counter['未设置'] += 1
# 计算各优先级占比
total_cases = len(test_cases)
level_percentages = {}
level_compliance = {}
print("\n测试用例优先级分布分析:")
for level, count in level_counter.items():
percentage = (count / total_cases) * 100
level_percentages[level] = percentage
# 检查是否符合规则配置
is_compliant = False
reason = "未找到对应规则"
if level in ['P0', 'P1', 'P2']:
level_key = level # 使用原始的P0、P1、P2作为键
if level_key in priority_ratios:
min_percent = priority_ratios[level_key]['min']
max_percent = priority_ratios[level_key]['max']
is_compliant = min_percent <= percentage <= max_percent
if is_compliant:
reason = f"符合要求:{min_percent}% ~ {max_percent}%"
else:
if percentage < min_percent:
reason = f"低于最小要求:{percentage:.1f}% < {min_percent}%"
else:
reason = f"超过最大要求:{percentage:.1f}% > {max_percent}%"
level_compliance[level] = {
"count": count,
"percentage": percentage,
"is_compliant": is_compliant,
"reason": reason
}
compliance_icon = "√" if is_compliant else "×"
print(f"{compliance_icon} {level}: {count} 条 ({percentage:.1f}%) - {reason}")
# 步骤3: AI评估
batch_limit_text = f",测试批次限制: {test_batch_count}" if test_batch_count else ",将处理所有数据"
print(f"\n开始AI评估{batch_limit_text}")
print(f"总计需要评估 {len(test_cases)} 个测试用例")
evaluations = await evaluator.evaluate_test_cases(test_cases, test_batch_count)
# 步骤4: 保存评估结果
if evaluations:
file_manager = get_file_manager()
# 包装为字典格式以符合save_json_data的接口,并添加优先级分析结果
result_data = {
"evaluation_results": evaluations,
"total_count": len(evaluations),
# 以所选源文件名(不含扩展名)作为生成标识
"generated_at": base_name,
"process_start_time": start_time_str,
"process_end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"priority_analysis": {
"distribution": level_percentages,
"compliance": level_compliance,
"rules": priority_ratios
}
}
file_manager.save_json_data(result_data, str(result_file))
print(f"\n评估完成!结果已保存到: {result_file}")
print(f"共评估了 {len(evaluations)} 个测试用例")
else:
print("\n没有生成评估结果")
except Exception as e:
print(f"处理失败: {str(e)}")
raise
# 计算处理时间
end_time = datetime.now()
end_time_str = end_time.strftime("%Y-%m-%d %H:%M:%S")
total_seconds = (end_time - start_time).total_seconds()
print(f"\n结束处理时间: {end_time_str}")
print(f"总处理时间: {total_seconds:.2f} 秒")
# 计算平均每条数据的处理时间
if evaluations and len(evaluations) > 0:
avg_time_per_case = total_seconds / len(evaluations)
print(f"平均每条数据处理时间: {avg_time_per_case:.2f} 秒/条")
else:
print("无法计算平均处理时间:没有成功处理的数据")
if __name__ == "__main__":
print(
r'''
______ __ __ ______ __ __ __ ______ ______ ______ ______
/\ ___\ /\ \ / / /\ __ \ /\ \ /\ \/\ \ /\ __ \ /\__ _\ /\ __ \ /\ == \
\ \ __\ \ \ \'/ \ \ __ \ \ \ \____ \ \ \_\ \ \ \ __ \ \/_/\ \/ \ \ \/\ \ \ \ __<
\ \_____\ \ \__| \ \_\ \_\ \ \_____\ \ \_____\ \ \_\ \_\ \ \_\ \ \_____\ \ \_\ \_\
\/_____/ \/_/ \/_/\/_/ \/_____/ \/_____/ \/_/\/_/ \/_/ \/_____/ \/_/ /_/
正在加载本地配置...
''')
# 测试模式:处理3批数据
# asyncio.run(main_process(test_batch_count=3))
# 正式模式:处理所有数据
asyncio.run(main_process()) # 不传入test_batch_count参数,将处理所有数据