Skip to main content
Glama

TAPD Data Fetcher

test_case_evaluator.py37.3 kB
""" AI 测试用例评估器 本脚本提供了一个完整的解决方案,用于从 Excel 文件中读取测试用例, 利用大型语言模型(LLM)进行自动化评估,并生成详细的改进建议。 它集成了动态token管理、自定义评估规则和需求单知识库,以实现高效、准确的评估流程。 核心功能: 1. **数据处理**:从 Excel 文件读取测试用例,并将其转换为结构化的 JSON 格式。 2. **Token管理**:使用统一的 TokenCounter(来自 common_utils),支持精确的 Token 计数器 (优先使用 `transformers` 库),并根据动态计算的提示词长度自动将大量测试用例分割成 适合模型上下文窗口的批次。 3. **动态提示词**:基于 `test_case_rules_customer.py` 中的自定义规则(如标题长度、步骤数) 构建评估提示词,确保评估标准的一致性和灵活性。 4. **知识库集成**:关联 `test_case_require_list_knowledge_base.py` 中的需求单知识库, 在评估时为 LLM 提供相关的需求背景信息,提高评估的准确性。 5. **异步评估**:使用 `aiohttp` 进行异步 API 调用,并行处理多个评估请求,提高处理效率。 6. **结果解析**:能够解析 LLM 返回的 Markdown 表格格式的评估结果,并将其转换为结构化的 JSON 数据,便于后续分析和存储。 类与模块说明: - `TestCaseProcessor`: 处理测试用例数据。主要功能是将 Excel 文件转换为 JSON 格式, 并进行必要的字段映射和数据清理。 - `TestCaseEvaluator`: 核心评估器类。 - `__init__`: 初始化评估器,加载规则配置和需求单知识库,并根据模型上下文限制计算 token 分配策略。 - `_build_dynamic_prompt_template`: 根据规则动态构建发送给 LLM 的提示词模板。 - `estimate_batch_tokens`: 估算一个批次的测试用例在组合成单个请求后所需的 token 总量。 - `split_test_cases_by_tokens`: 根据 token 阈值将所有用例分割成多个批次。 - `evaluate_batch`: 对单个批次的测试用例执行 AI 评估,发送异步请求并获取结果。 - `parse_evaluation_result`: 解析 AI 返回的 Markdown 格式的评估结果。 - `evaluate_test_cases`: 编排整个评估流程,从批次分割到结果汇总。 处理流程: 1. `main_process` 函数启动处理流程。 2. `TestCaseProcessor` 读取 Excel 文件并转换为 JSON。 3. `TestCaseEvaluator` 加载所有测试用例。 4. `split_test_cases_by_tokens` 方法根据 `token_threshold` 将用例分割成多个批次。 5. 对于每个批次,`evaluate_batch` 方法构建一个包含该批次所有用例的提示词,并异步调用 LLM API。 6. `parse_evaluation_result` 方法解析返回的 Markdown 表格,提取每个用例的评分和建议。 7. 所有批次处理完成后,结果被汇总并保存到 `Proceed_TestCase_...json` 文件中。 注意: - TokenCounter 类已迁移至 common_utils.py 统一管理,本脚本通过 get_token_counter() 获取实例 - 集成了统一的配置管理、文件管理和API管理接口 """ import re import json import asyncio import aiohttp import pandas as pd import sys from typing import List, Dict, Any, Optional, Tuple from pathlib import Path # 添加项目根目录和mcp_tools目录到路径 current_dir = Path(__file__).parent project_root = current_dir.parent mcp_tools_path = project_root / "mcp_tools" if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) if str(mcp_tools_path) not in sys.path: sys.path.insert(0, str(mcp_tools_path)) # 导入公共工具 from common_utils import ( get_config, get_api_manager, get_file_manager, get_token_counter, BatchingUtils, MarkdownUtils, TokenBudgetUtils, ) from test_case_rules_customer import get_test_case_rules from test_case_require_list_knowledge_base import RequirementKnowledgeBase class TestCaseProcessor: """Excel测试用例处理器""" def __init__(self): self.config = get_config() self.file_manager = get_file_manager() def excel_to_json(self, excel_file_path: str, json_file_path: str) -> List[Dict[str, Any]]: """ 将Excel文件转换为JSON格式 参数: excel_file_path: Excel文件路径 json_file_path: 输出JSON文件路径 返回: 转换后的数据列表 """ try: column_mapping = { '用例ID': 'test_case_id', 'UUID': 'UUID', '用例标题': 'test_case_title', '用例目录': 'test_case_menu', '是否自动化': 'is_automatic', '等级': 'level', '前置条件': 'prerequisites', '步骤描述类型': 'step_description_type', '步骤描述': 'step_description', '预期结果': 'expected_result', } json_data = self.file_manager.read_excel_with_mapping(excel_file_path, column_mapping) with open(json_file_path, 'w', encoding='utf-8') as f: json.dump(json_data, f, ensure_ascii=False, indent=2) print(f"成功转换 {len(json_data)} 条测试用例数据到 {json_file_path}") return json_data except Exception as e: raise RuntimeError(f"Excel转JSON失败: {e}") class TestCaseEvaluator: """测试用例AI评估器""" def __init__(self, max_context_tokens: int = 8000): self.config = get_config() self.api_manager = get_api_manager() self.file_manager = get_file_manager() self.token_counter = get_token_counter() # 初始化需求单知识库 self.requirement_kb = RequirementKnowledgeBase() # 加载自定义规则配置 self.rules_config = get_test_case_rules() print(f"已加载测试用例评估规则配置:") print(f" - 标题长度≤{self.rules_config['title_max_length']}字符") print(f" - 步骤数≤{self.rules_config['max_steps']}步") print(f" - 优先级占比配置:") for priority, ratio in self.rules_config['priority_ratios'].items(): print(f" * {priority}: {ratio['min']}-{ratio['max']}%") print(f"需求单知识库: 已加载 {len(self.requirement_kb.requirements)} 个需求单") # 上下文管理与预算(新): # max_context_tokens 由以下部分组成: # 20%预留tokens + 需求单tokens + 模板提示词tokens + 剩余可用tokens # 剩余可用tokens = 待处理用例tokens + 返回表格tokens(约等于待处理用例tokens) # 即:reserve + template + requirements + 2 * test_cases_tokens <= max_context self.max_context_tokens = max_context_tokens # 构建动态评估提示词模板 self.evaluation_prompt_template = self._build_dynamic_prompt_template() # 计算提示词模板的基础token数量(不包含动态需求单与测试用例数据) template_with_rules = self.evaluation_prompt_template template_base = ( template_with_rules .replace('{test_case_id}', '') .replace('{test_case_title}', '') .replace('{prerequisites}', '') .replace('{step_description}', '') .replace('{expected_result}', '') .replace('{test_cases_json}', '') .replace('{requirement_info}', '') ) self.template_base_tokens = self.token_counter.count_tokens(template_base) # 需求单信息替换后单独计算tokens(全局一次,批次内复用) self.requirement_info_text = self.requirement_kb.get_requirements_for_evaluation() self.requirement_tokens = self.token_counter.count_tokens(self.requirement_info_text) # 20% 预留 self.reserve_tokens = max(64, int(self.max_context_tokens * 0.20)) # 动态可用预算(用于 请求用例 + 响应表格) self.dynamic_pair_budget = self.max_context_tokens - ( self.reserve_tokens + self.template_base_tokens + self.requirement_tokens ) if self.dynamic_pair_budget <= 0: # 模板或需求单过大,给出降级提示并尽量留出最小对话空间 print( f"警告:模板或需求单内容过大(模板={self.template_base_tokens}, 需求={self.requirement_tokens}), " f"已超出窗口预算。将采用最小对称预算进行计算。" ) self.dynamic_pair_budget = max(256, int(self.max_context_tokens * 0.20)) # 单边预算(仅请求用例),在“响应≈2×请求”的假设下: # reserve + template + requirements + (request + response) # ≈ reserve + template + requirements + (request + 2×request) # ≈ 固定项 + 3×request <= max_context # 因此 request 的安全上限 ≈ dynamic_pair_budget / 3 self.single_side_budget = max(64, self.dynamic_pair_budget // 3) print( "Token预算配置: " f"总上下文={max_context_tokens}, 预留={self.reserve_tokens}, 模板={self.template_base_tokens}, 需求={self.requirement_tokens}, " f"可用(请求+响应)={self.dynamic_pair_budget}, 单边(请求)≈{self.single_side_budget}(响应≈2×请求JSON)" ) def _build_dynamic_prompt_template(self) -> str: """ 根据自定义规则配置构建动态提示词模板 返回: str: 动态生成的提示词模板 """ title_max_length = self.rules_config['title_max_length'] max_steps = self.rules_config['max_steps'] priority_ratios = self.rules_config['priority_ratios'] # 构建优先级占比说明 p0_range = f"{priority_ratios['P0']['min']}%~{priority_ratios['P0']['max']}%" p1_range = f"{priority_ratios['P1']['min']}%~{priority_ratios['P1']['max']}%" p2_range = f"{priority_ratios['P2']['min']}%~{priority_ratios['P2']['max']}%" template = f"""你需要为一批业务需求用例进行打分与评估。请为以下测试用例分别生成评估表格。每个用例应该有自己独立的表格。 ## 重要提示: 1. 请为每个用例生成一个完整的表格 2. 每个表格都应该包含表头 "| 用例信息 | 分数 | 改进建议 |" 3. 每个表格之间用空行分隔 4. 严格按照提供的表格格式输出 5. **除此以外不需要任何分析或解释** 6. **必须从测试用例JSON数据中提取真实的字段内容,不要使用任何占位符** ## 评分规则: | 用例要素 | 是否必须 | 要求 | | ---- | ---- | -------------------------------------------------------------------------------------------------------------------------- | | 关联需求 | 是 | 1. 用例应当与需求单中的一条或多条有关 | | 用例标题 | 是 | 1. 标题长度不超过 {title_max_length} 字符,且描述测试功能点<br>2. 语言清晰,简洁易懂<br>3. 避免在用例评分中出现步骤 | | 前置条件 | 否 | 1. 列出所有前提:账号类型,灰度等<br>2. 避免过度复杂:每个条件不超过 2 项描述,必要时分点列出 | | 测试步骤 | 是 | 1. 步骤用编号链接:使用 1、2、3... 结构,每步描述一个动作<br>2. 步骤具体化:包含用户操作、输入值和上下文说明<br>3. 步骤数合理:不超过 {max_steps} 步,否则需分解为多个用例。<br>4. 避免步骤中带有检查点 | | 预期结果 | 是 | 1. 描述明确结果:明确的结果,确切的检查<br>2. 避免模棱两可的词(如功能正常,跟现网一致等) | | 优先级 | 是 | 1. 等级划分标准:采用 P0-P2<br>2. 明确标注:每个用例必须有优先级字段。<br>3. 优先级比例:P0占比应在 {p0_range} 之间,P1占比 {p1_range},P2占比 {p2_range} | * 所有评分满分为 10 分,未提供必须提供的字段给0分,每有一点要求未满足则酌情扣1-2分,最低 0 分。 * 未提供前置条件时,给 -1 分,便于后期横向对比。 * 若用例与需求单中任何一条需求都无关,则给0分。若与某一条需求相关程度较高,则根据相关程度给出分数。若同时与多条需求相关,则根据相关程度综合评分。 * 对低于 10 分的要素给出准确的建议。给出的每一条建议都应当具体明确,且不超过100字。 ## 关联需求单信息: {{requirement_info}} ## 请为每个测试用例生成如下格式的表格 | 用例信息 | 分数 | 改进建议 | | ----------------------------------------- | --- | ------------------- | | **用例ID**<br>[从JSON中提取真实的test_case_id] | - | - | | **用例标题**<br>[从JSON中提取真实的test_case_title] | 8 | 改为“验证错误密码登录的失败提示” | | **前置条件**<br>[从JSON中提取真实的prerequisites] | 8 | 补充系统版本要求 | | **测试步骤**<br>[从JSON中提取真实的step_description] | 6 | 步骤3增加“等待3秒” | | **预期结果**<br>[从JSON中提取真实的expected_result] | 7 | 明确提示位置(如:输入框下方红色文字) | 表格中的“改进建议”内容仅供参考,提供建议时需要尽可能考虑到业务的方方面面,而不只是局限于表格模板。 ## 测试用例JSON数据 {{test_cases_json}} """ return template def estimate_batch_tokens(self, test_cases: List[Dict[str, Any]]) -> int: """ 估算一批“仅测试用例数据部分”的token数量(不含模板与需求单)。 参数: test_cases: 测试用例列表 返回: 仅测试用例JSON文本的token估算 """ if not test_cases: return 0 # 仅对测试用例JSON进行计数,模板与需求单已单独预计算 test_cases_json = json.dumps(test_cases, ensure_ascii=False, indent=2) return self.token_counter.count_tokens(test_cases_json) def split_test_cases_by_tokens(self, test_cases: List[Dict[str, Any]], start_index: int = 0) -> Tuple[List[Dict[str, Any]], int]: """ 根据token限制分割测试用例 参数: test_cases: 全部测试用例列表 start_index: 开始索引 返回: (当前批次的测试用例, 下一批次的开始索引) """ # 基于“响应≈2×请求”的单边预算进行分批(仅统计请求侧的用例JSON部分) batch, next_index, current_tokens = BatchingUtils.split_by_token_budget( test_cases, estimate_tokens_fn=self.estimate_batch_tokens, token_threshold=self.single_side_budget, start_index=start_index, ) # 在“响应≈2×请求”的假设下,响应上限按“2×用例JSON tokens”取值 request_tokens_est = current_tokens # 仅用例JSON部分 # 再次基于全局预算做一次安全裁剪,避免边界超限 max_pairs_left = max( 0, self.max_context_tokens - (self.reserve_tokens + self.template_base_tokens + self.requirement_tokens) ) allowed_response_by_pairs = max_pairs_left - request_tokens_est theoretical_response_cap = request_tokens_est * 2 # 受理论上限与剩余额度共同限制 response_tokens_cap = max(64, min(theoretical_response_cap, allowed_response_by_pairs)) total_estimated_tokens = ( self.reserve_tokens + self.template_base_tokens + self.requirement_tokens + request_tokens_est + response_tokens_cap ) # 显示当前批次包含的测试用例ID与token预算 batch_ids = [case.get('test_case_id', 'N/A') for case in batch] print( f"当前批次包含 {len(batch)} 个测试用例,预计tokens: 模板={self.template_base_tokens}, 需求={self.requirement_tokens}, " f"请求≈{request_tokens_est}, 响应上限≈{response_tokens_cap}, 预留={self.reserve_tokens}, " f"合计≈{total_estimated_tokens}(窗口={self.max_context_tokens})" ) print(f"批次包含的用例ID: {', '.join(batch_ids)}") return batch, next_index async def evaluate_batch(self, test_cases: List[Dict[str, Any]], session: aiohttp.ClientSession) -> str: """ 评估一批测试用例 参数: test_cases: 测试用例列表 session: HTTP会话 返回: AI评估结果 """ # 构建批量提示词 - 一次性处理多个测试用例 test_cases_json = json.dumps(test_cases, ensure_ascii=False, indent=2) # 使用全局已缓存的需求单信息 requirement_info = self.requirement_info_text # 构建最终提示词 final_prompt = self.evaluation_prompt_template.format( requirement_info=requirement_info, test_cases_json=test_cases_json ) # 运行期注入校验与简短预览,帮助定位“未注入需求单”的问题 try: assert "需求单信息" in final_prompt, "提示词中缺少需求单信息标题" if requirement_info.strip(): # 取需求单首行进行存在性检查 first_line = requirement_info.strip().splitlines()[0] if first_line: assert first_line in final_prompt, "需求单文本可能未正确注入提示词" except AssertionError as _e: print(f"[警告] 需求单注入校验失败: {_e}") # 调试预览(只展示需求单与用例片段,避免日志过长) try: req_preview = requirement_info[:200].replace('\n', ' ') cases_preview = test_cases_json[:200].replace('\n', ' ') print(f"\n需求单片段: \n{req_preview}...") print(f"用例JSON片段: \n{cases_preview}...\n") except Exception: pass # 仅用例JSON的tokens(用于计算响应上限) request_cases_tokens = self.token_counter.count_tokens(test_cases_json) # 计算响应上限:取“2×用例”的理论值,并受总体预算限制 theoretical_response_cap = request_cases_tokens * 2 # 总体剩余额度(去掉预留、模板、需求及请求全部提示词后的剩余) total_prompt_tokens = self.token_counter.count_tokens(final_prompt) leftover_budget = max(0, self.max_context_tokens - self.reserve_tokens - total_prompt_tokens) dynamic_response_tokens = max(64, min(theoretical_response_cap, leftover_budget)) # print( # f"批量处理 {len(test_cases)} 个用例: " # f"模板={self.template_base_tokens}, 需求={self.requirement_tokens}, 用例JSON≈{request_cases_tokens}, " # f"完整请求≈{total_prompt_tokens}, 响应tokens限制≈{dynamic_response_tokens}(响应≈2×请求JSON)" # ) print("正在调用AI进行评估...") # 调用AI API(使用默认配置,支持环境变量自动检测) result = await self.api_manager.call_llm( prompt=final_prompt, session=session, max_tokens=self.max_context_tokens ) print("AI评估完成,开始解析结果...") return result def parse_evaluation_result(self, ai_response: str) -> List[Dict[str, Any]]: """ 解析AI评估结果,将Markdown表格转换为JSON 支持解析多个表格(当AI返回多个表格时) 参数: ai_response: AI返回的Markdown表格 返回: 解析后的评估结果列表 """ evaluations: List[Dict[str, Any]] = [] tables = MarkdownUtils.parse_markdown_tables(ai_response) print(f"找到 {len(tables)} 个表格") print("开始解析表格数据...") if not tables: print("未找到有效的表格数据") return evaluations # 将通用表格转换为业务结构 for idx, tbl in enumerate(tables): headers = tbl.get("headers", []) rows = tbl.get("rows", []) print(f"解析第 {idx + 1} 个表格,包含 {len(rows)} 行") current_case: Optional[Dict[str, Any]] = None case_id: Optional[str] = None # 逐行处理 for row in rows: # 预期列:用例信息 | 分数 | 改进建议 if len(row) < 3: continue field_info, score, suggestion = row[0].strip(), row[1].strip(), row[2].strip() if '**用例ID**' in field_info or 'ID' in field_info: # 提取用例ID if '<br>' in field_info: id_part = field_info.split('<br>')[-1].strip() else: id_match = re.search(r'\d{8,}', field_info) id_part = id_match.group() if id_match else field_info.replace('**用例ID**', '').strip() case_id = id_part print(f" 正在解析用例ID: {case_id}") if current_case: evaluations.append(current_case) current_case = {'test_case_id': case_id, 'evaluations': []} continue if current_case and '**' in field_info: if '<br>' in field_info: field_parts = field_info.split('<br>') field_name = field_parts[0].replace('**', '').strip() field_content = '<br>'.join(field_parts[1:]).strip() if len(field_parts) > 1 else '' field_content = field_content.replace('<br>', '\n') else: parts = field_info.split('**') if len(parts) >= 3: field_name = parts[1].strip() field_content = '**'.join(parts[2:]).strip() else: field_name = field_info.replace('**', '').strip() field_content = '' field_name = field_name.replace('*', '').strip() field_content = field_content.replace('*', '').strip() print(f" 解析字段: {field_name} (分数: {score if score != '-' else '无'})") evaluation_item = { 'field': field_name, 'content': field_content, 'score': score if score != '-' else None, 'suggestion': suggestion if suggestion != '-' else None, } current_case['evaluations'].append(evaluation_item) if current_case: evaluations.append(current_case) print(f" 完成用例解析: {current_case['test_case_id']}") print(f"成功解析 {len(evaluations)} 个用例的评估结果") if evaluations: first_case = evaluations[0] print(f"示例解析结果 - 用例ID: {first_case['test_case_id']}, 评估项数: {len(first_case['evaluations'])}") for item in first_case['evaluations'][:2]: print(f" - {item['field']}: 分数={item['score']}, 建议={item['suggestion']}") return evaluations async def evaluate_test_cases(self, test_cases: List[Dict[str, Any]], test_batch_count: Optional[int] = None) -> List[Dict[str, Any]]: """ 评估测试用例 参数: test_cases: 测试用例列表 test_batch_count: 测试数据批次,1表示只处理第一批 返回: 评估结果列表 """ all_evaluations = [] current_index = 0 batch_number = 1 async with aiohttp.ClientSession() as session: while current_index < len(test_cases): # 如果设置了测试批次限制,检查是否超过 if test_batch_count and batch_number > test_batch_count: print(f"达到测试批次限制 ({test_batch_count}),停止处理") break # 显示总体进度 remaining_cases = len(test_cases) - current_index progress_percent = (current_index / len(test_cases)) * 100 print(f"\n总体进度: {current_index}/{len(test_cases)} ({progress_percent:.1f}%), 剩余 {remaining_cases} 个用例") print(f"开始处理第 {batch_number} 批次...") # 分割当前批次 batch_cases, next_index = self.split_test_cases_by_tokens( test_cases, current_index ) if not batch_cases: print("没有更多测试用例可处理") break try: # 评估当前批次 ai_result = await self.evaluate_batch(batch_cases, session) print(f"AI返回结果长度: {len(ai_result)}") print(f"AI返回结果字符预览: ================================================================================") print(f"\n{ai_result}\n") print("====================================================================================================") # 解析结果 batch_evaluations = self.parse_evaluation_result(ai_result) all_evaluations.extend(batch_evaluations) # 显示本批次处理的用例ID processed_ids = [eval_result['test_case_id'] for eval_result in batch_evaluations] print(f"第 {batch_number} 批次处理完成,评估了 {len(batch_evaluations)} 个用例") print(f"已完成评估的用例ID: {', '.join(processed_ids)}") # 如果是测试模式且第一批次完成,询问是否继续 if test_batch_count == 1 and batch_number == 1: print(f"\n第一批次测试完成,评估结果预览:") if batch_evaluations: first_eval = batch_evaluations[0] print(f"用例ID: {first_eval['test_case_id']}") print(f"评估项数量: {len(first_eval['evaluations'])}") # 显示第一个评估项的详细信息 if first_eval['evaluations']: first_item = first_eval['evaluations'][0] print(f"示例评估 - {first_item['field']}: 分数={first_item.get('score', '无')}, 建议={first_item.get('suggestion', '无')}") else: print("解析评估结果失败,可能需要调整解析逻辑") print(f"AI原始返回: {ai_result[:500]}...") print("\n如需处理更多批次,请修改 test_batch_count 参数") break except Exception as e: print(f"第 {batch_number} 批次处理失败: {str(e)}") if test_batch_count == 1: print("测试批次失败,请检查API配置和网络连接") break else: print("跳过当前批次,继续处理下一批次") current_index = next_index batch_number += 1 # 批次间延迟,避免API限制 await asyncio.sleep(1) return all_evaluations async def main_process(test_batch_count: Optional[int] = None): """ 主处理流程 参数: test_batch_count: 测试数据批次限制,None表示处理所有数据 """ config = get_config() processor = TestCaseProcessor() evaluator = TestCaseEvaluator(max_context_tokens=8000) # 获取规则配置中的优先级比例要求 rules_config = get_test_case_rules() priority_ratios = rules_config['priority_ratios'] # 文件选择:自动扫描 local_data 下的 .xlsx,并让用户交互选择 xlsx_files = sorted([p for p in config.local_data_path.glob("*.xlsx") if p.is_file()]) if not xlsx_files: raise FileNotFoundError(f"未在目录中找到任何 .xlsx 文件: {config.local_data_path}") print("\n检测到以下可处理的 Excel 文件:") for idx, p in enumerate(xlsx_files, start=1): print(f" {idx}. {p.name}") selected_index: Optional[int] = None while selected_index is None: choice = input("\n请输入要处理的文件编号 (数字): ").strip() if not choice.isdigit(): print("输入无效,请输入数字编号。") continue num = int(choice) if num < 1 or num > len(xlsx_files): print(f"编号超出范围,请输入 1~{len(xlsx_files)} 之间的数字。") continue selected_index = num - 1 excel_file = xlsx_files[selected_index] base_name = excel_file.stem # 去除扩展名后的文件名 # 中间转换产物(如不存在则生成一次),仍按 <源名>.json 以便复用 json_file = config.local_data_path / f"{base_name}.json" # 最终评估结果文件:Proceed_<用户选择的文件名>.json result_file = config.local_data_path / f"Proceed_{base_name}.json" print(f"\n已选择文件: {excel_file.name}") print(f"转换JSON路径: {json_file.name}") print(f"结果输出路径: {result_file.name}") try: # 步骤1: 检查Excel文件是否存在 if not excel_file.exists(): raise FileNotFoundError(f"Excel文件不存在: {excel_file}") # 步骤2: 将Excel转换为JSON(如果JSON文件不存在) if not json_file.exists(): print("开始转换Excel文件为JSON格式...") test_cases = processor.excel_to_json(str(excel_file), str(json_file)) else: print("JSON文件已存在,直接加载...") with open(json_file, 'r', encoding='utf-8') as f: test_cases = json.load(f) print(f"加载了 {len(test_cases)} 条测试用例数据") from datetime import datetime from collections import Counter start_time = datetime.now() start_time_str = start_time.strftime("%Y-%m-%d %H:%M:%S") print(f"\n开始处理时间: {start_time_str}") # 分析测试用例的优先级分布 level_counter = Counter() for test_case in test_cases: level = test_case.get('level', '').strip().upper() if level: # 标准化优先级表示,例如P0、P1、P2 if level.startswith('P') and len(level) > 1 and level[1].isdigit(): level_counter[level] += 1 else: level_counter['其他'] += 1 else: level_counter['未设置'] += 1 # 计算各优先级占比 total_cases = len(test_cases) level_percentages = {} level_compliance = {} print("\n测试用例优先级分布分析:") for level, count in level_counter.items(): percentage = (count / total_cases) * 100 level_percentages[level] = percentage # 检查是否符合规则配置 is_compliant = False reason = "未找到对应规则" if level in ['P0', 'P1', 'P2']: level_key = level # 使用原始的P0、P1、P2作为键 if level_key in priority_ratios: min_percent = priority_ratios[level_key]['min'] max_percent = priority_ratios[level_key]['max'] is_compliant = min_percent <= percentage <= max_percent if is_compliant: reason = f"符合要求:{min_percent}% ~ {max_percent}%" else: if percentage < min_percent: reason = f"低于最小要求:{percentage:.1f}% < {min_percent}%" else: reason = f"超过最大要求:{percentage:.1f}% > {max_percent}%" level_compliance[level] = { "count": count, "percentage": percentage, "is_compliant": is_compliant, "reason": reason } compliance_icon = "√" if is_compliant else "×" print(f"{compliance_icon} {level}: {count} 条 ({percentage:.1f}%) - {reason}") # 步骤3: AI评估 batch_limit_text = f",测试批次限制: {test_batch_count}" if test_batch_count else ",将处理所有数据" print(f"\n开始AI评估{batch_limit_text}") print(f"总计需要评估 {len(test_cases)} 个测试用例") evaluations = await evaluator.evaluate_test_cases(test_cases, test_batch_count) # 步骤4: 保存评估结果 if evaluations: file_manager = get_file_manager() # 包装为字典格式以符合save_json_data的接口,并添加优先级分析结果 result_data = { "evaluation_results": evaluations, "total_count": len(evaluations), # 以所选源文件名(不含扩展名)作为生成标识 "generated_at": base_name, "process_start_time": start_time_str, "process_end_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "priority_analysis": { "distribution": level_percentages, "compliance": level_compliance, "rules": priority_ratios } } file_manager.save_json_data(result_data, str(result_file)) print(f"\n评估完成!结果已保存到: {result_file}") print(f"共评估了 {len(evaluations)} 个测试用例") else: print("\n没有生成评估结果") except Exception as e: print(f"处理失败: {str(e)}") raise # 计算处理时间 end_time = datetime.now() end_time_str = end_time.strftime("%Y-%m-%d %H:%M:%S") total_seconds = (end_time - start_time).total_seconds() print(f"\n结束处理时间: {end_time_str}") print(f"总处理时间: {total_seconds:.2f} 秒") # 计算平均每条数据的处理时间 if evaluations and len(evaluations) > 0: avg_time_per_case = total_seconds / len(evaluations) print(f"平均每条数据处理时间: {avg_time_per_case:.2f} 秒/条") else: print("无法计算平均处理时间:没有成功处理的数据") if __name__ == "__main__": print( r''' ______ __ __ ______ __ __ __ ______ ______ ______ ______ /\ ___\ /\ \ / / /\ __ \ /\ \ /\ \/\ \ /\ __ \ /\__ _\ /\ __ \ /\ == \ \ \ __\ \ \ \'/ \ \ __ \ \ \ \____ \ \ \_\ \ \ \ __ \ \/_/\ \/ \ \ \/\ \ \ \ __< \ \_____\ \ \__| \ \_\ \_\ \ \_____\ \ \_____\ \ \_\ \_\ \ \_\ \ \_____\ \ \_\ \_\ \/_____/ \/_/ \/_/\/_/ \/_____/ \/_____/ \/_/\/_/ \/_/ \/_____/ \/_/ /_/ 正在加载本地配置... ''') # 测试模式:处理3批数据 # asyncio.run(main_process(test_batch_count=3)) # 正式模式:处理所有数据 asyncio.run(main_process()) # 不传入test_batch_count参数,将处理所有数据

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/OneCuriousLearner/MCPAgentRE'

If you have feedback or need assistance with the MCP directory API, please join our Discord server