Skip to main content
Glama

baidu-ai-search

Official
by baidubce
parse_tests_and_coverage.py14.8 kB
#!/usr/bin/env python3 import os import sys import subprocess import glob # 添加这一行 from lxml import etree from git import Repo from collections import defaultdict def parse_surefire_reports(): """解析 Maven Surefire 生成的测试报告,打印每个测试的结果,并汇总测试统计信息。""" surefire_reports = glob.glob('target/surefire-reports/*.xml') if not surefire_reports: print("未找到 Surefire 报告文件(target/surefire-reports/*.xml)。") sys.stdout.flush() # 立即刷新输出 return None total_tests = 0 total_failures = 0 total_errors = 0 total_skipped = 0 passed_tests = [] failed_tests = [] print("\n=== 测试结果 ===") sys.stdout.flush() # 立即刷新输出 for report in surefire_reports: try: tree = etree.parse(report) root = tree.getroot() test_suite_name = root.get('name', 'Unknown Test Suite') for test_case in root.findall('testcase'): test_name = test_case.get('name') class_name = test_case.get('classname') status = "OK" error_message = "" if test_case.find('failure') is not None: status = "FAIL" error = test_case.find('failure') error_message = error.text.strip() if error.text else "未知错误" failed_tests.append((class_name, test_name, error_message)) total_failures += 1 elif test_case.find('error') is not None: status = "ERROR" error = test_case.find('error') error_message = error.text.strip() if error.text else "未知错误" failed_tests.append((class_name, test_name, error_message)) total_errors += 1 elif test_case.find('skipped') is not None: status = "SKIPPED" total_skipped += 1 else: passed_tests.append((class_name, test_name)) total_tests += 1 # 打印每个测试的结果 if status == "OK": print(f"[OK] {class_name}.{test_name}") elif status in ["FAIL", "ERROR"]: print(f"[{status}] {class_name}.{test_name}") print(f" 错误信息: {error_message}") elif status == "SKIPPED": print(f"[SKIPPED] {class_name}.{test_name}") sys.stdout.flush() # 立即刷新输出 except etree.XMLSyntaxError as e: print(f"XML 解析错误在文件 {report}: {e}") sys.stdout.flush() # 立即刷新输出 except Exception as e: print(f"解析报告文件 {report} 时发生错误: {e}") sys.stdout.flush() # 立即刷新输出 # 打印测试总结 print("\n=== 测试总结 ===") print(f"总测试数: {total_tests}") print(f"成功: {len(passed_tests)}") print(f"失败: {total_failures}") print(f"错误: {total_errors}") print(f"跳过: {total_skipped}") sys.stdout.flush() # 立即刷新输出 return { "total_tests": total_tests, "passed": len(passed_tests), "failures": total_failures, "errors": total_errors, "skipped": total_skipped } def parse_jacoco(): """解析 JaCoCo XML 报告,打印覆盖率详情并返回包含类名和未覆盖行号的字典。""" JACOCO_XML = "target/site/jacoco/jacoco.xml" if not os.path.isfile(JACOCO_XML): print("jacoco.xml 未找到。请先运行 'mvn jacoco:report' 生成报告。") sys.stdout.flush() # 立即刷新输出 return {} try: parser = etree.XMLParser(load_dtd=False, no_network=True, recover=True) tree = etree.parse(JACOCO_XML, parser) root = tree.getroot() except etree.XMLSyntaxError as e: print(f"XML 解析错误: {e}") sys.stdout.flush() # 立即刷新输出 return {} except Exception as e: print(f"解析 jacoco.xml 时发生错误: {e}") sys.stdout.flush() # 立即刷新输出 return {} # 初始化总计数器 total_lines_missed = 0 total_lines_covered = 0 total_branches_missed = 0 total_branches_covered = 0 # 创建数据表 table_data = [["Class", "Line Coverage (%)", "Branch Coverage (%)", "Missing Lines", "Missing Branches"]] # 初始化存储结果的字典 coverage_data = {} # 遍历所有 class 元素 for class_element in root.findall('.//class'): class_name = class_element.get('name') # 行覆盖率 counter_line = class_element.find("counter[@type='LINE']") if counter_line is not None: lines_missed = int(counter_line.get('missed', '0')) lines_covered = int(counter_line.get('covered', '0')) else: lines_missed = 0 lines_covered = 0 total_lines_missed += lines_missed total_lines_covered += lines_covered # 分支覆盖率 counter_branch = class_element.find("counter[@type='BRANCH']") if counter_branch is not None: branches_missed = int(counter_branch.get('missed', '0')) branches_covered = int(counter_branch.get('covered', '0')) total_branches = branches_missed + branches_covered if total_branches > 0: branch_coverage = f"{(branches_covered / total_branches) * 100:.2f}%" else: branch_coverage = "N/A" else: branches_missed = 0 branches_covered = 0 branch_coverage = "N/A" total_lines_missed += lines_missed total_lines_covered += lines_covered total_branches_missed += branches_missed total_branches_covered += branches_covered # 获取 sourcefilename 属性 sourcefilename = class_element.get('sourcefilename') if sourcefilename: package_element = class_element.getparent() if package_element is not None and package_element.tag == 'package': sourcefile_element = package_element.find(f"sourcefile[@name='{sourcefilename}']") if sourcefile_element is not None: # 查找 mi > 0 的行号(未覆盖的行) missing_lines = [ int(line.get('nr')) for line in sourcefile_element.findall('line') if line.get('mi') and int(line.get('mi')) > 0 ] missing_lines_str = ",".join(map(str, missing_lines)) if missing_lines else "-" # 查找 mb > 0 的分支(未覆盖的分支) missing_branches = [ int(line.get('nr')) for line in sourcefile_element.findall('line') if line.get('mb') and int(line.get('mb')) > 0 ] missing_branches_str = ",".join(map(str, missing_branches)) if missing_branches else "-" else: missing_lines_str = "N/A" missing_branches_str = "N/A" else: missing_lines_str = "N/A" missing_branches_str = "N/A" else: missing_lines_str = "N/A" missing_branches_str = "N/A" # 计算行覆盖率 total_lines = lines_missed + lines_covered if total_lines > 0: line_coverage = (lines_covered / total_lines) * 100 else: line_coverage = 0.0 table_data.append([class_name, f"{line_coverage:.2f}%", branch_coverage, missing_lines_str, missing_branches_str]) # Populate coverage_data dictionary if there are missing lines if missing_lines_str != "N/A" and missing_lines_str != "-": coverage_data[class_name] = [int(line) for line in missing_lines_str.split(',')] # 计算总覆盖率 total_lines = total_lines_missed + total_lines_covered if total_lines > 0: total_line_coverage = (total_lines_covered / total_lines) * 100 else: total_line_coverage = 0.0 total_branches = total_branches_missed + total_branches_covered if total_branches > 0: total_branch_coverage = (total_branches_covered / total_branches) * 100 else: total_branch_coverage = "N/A" # 添加总覆盖率到数据表 table_data.append(["-----------------------------", "----------------", "----------------", "----------------", "----------------"]) table_data.append(["Total Coverage:", f"{total_line_coverage:.2f}%", f"{total_branch_coverage}", "", ""]) # 计算每列的最大宽度 num_columns = len(table_data[0]) col_widths = [0] * num_columns for row in table_data: for idx, col in enumerate(row): if len(col) > col_widths[idx]: col_widths[idx] = len(col) # 打印覆盖率详情 print("\n=== 覆盖率详情 ===") for row in table_data: if len(row) == 1: print(row[0]) else: row_str = "" for idx, col in enumerate(row): # 左对齐并加上适当的空格 row_str += col.ljust(col_widths[idx] + 2) print(row_str) sys.stdout.flush() # 立即刷新输出 return coverage_data def get_modified_files(repo, base_branch, path_pattern='src/main/**/*.java'): """ 获取相对于 base_branch 的修改文件及其修改的行号 返回一个字典:{file_path: set(line_numbers)} """ # 获取 diff 输出,带有行号 diff_command = [ 'git', 'diff', base_branch, '--unified=0', '--', path_pattern ] try: diff_output = subprocess.check_output(diff_command, stderr=subprocess.STDOUT).decode('utf-8') except subprocess.CalledProcessError as e: print(f"执行 git diff 失败: {e.output.decode('utf-8')}") sys.stdout.flush() sys.exit(1) modified_files = defaultdict(set) # {file_path: set(line_numbers)} current_file = None line_num = 0 for line in diff_output.splitlines(): if line.startswith('diff --git'): parts = line.split() if len(parts) >= 4: a_path = parts[2][2:] b_path = parts[3][2:] current_file = b_path # 使用新文件路径 elif line.startswith('@@'): if current_file: # 解析 hunk header # 格式: @@ -start,count +start,count @@ hunk_header = line try: hunk_info = hunk_header.split(' ')[1:3] new_hunk = hunk_info[1] start, count = new_hunk[1:].split(',') start = int(start) count = int(count) except Exception as e: print(f"解析 hunk header 失败: {hunk_header}, 错误: {e}") sys.stdout.flush() continue line_num = start elif line.startswith('+') and not line.startswith('+++'): if current_file: modified_files[current_file].add(line_num) line_num += 1 elif line.startswith('-') and not line.startswith('---'): # 删除的行,不需要增加 line_num pass else: # 其他情况,不处理 pass return modified_files def generate_incremental_coverage_report(modified_files, coverage_data): """ 生成增量代码覆盖率报告 """ total_modified = 0 total_uncovered = 0 per_file_report = [] # 遍历修改的文件和行号 for file_path, modified_lines in modified_files.items(): # 将文件路径转换为类名,去掉 'java/src/main/java/' 和 '.java' class_name = file_path.replace('java/src/main/java/', '').replace('.java', '') # 从 coverage_data 中查找对应的类名 missing_lines = coverage_data.get(class_name, []) # 计算被覆盖和未被覆盖的行 uncovered_in_modified = modified_lines.intersection(missing_lines) covered_in_modified = modified_lines.difference(missing_lines) total_modified += len(modified_lines) total_uncovered += len(uncovered_in_modified) # 保存每个文件的报告 per_file_report.append({ 'file': file_path, 'modified': sorted(modified_lines), 'covered': sorted(covered_in_modified), 'uncovered': sorted(uncovered_in_modified) }) # 计算增量覆盖率 coverage_percent = ((total_modified - total_uncovered) / total_modified) * 100 if total_modified > 0 else 0.0 # 生成报告输出 report_lines = [] report_lines.append("=== 增量代码覆盖率报告 ===\n") for file_report in per_file_report: report_lines.append(f"文件: {file_report['file']}") report_lines.append(f"修改的行数: {len(file_report['modified'])}") report_lines.append(f"被覆盖的行数: {len(file_report['covered'])}") report_lines.append(f"未被覆盖的行数: {len(file_report['uncovered'])}") if file_report['uncovered']: report_lines.append(f"未覆盖的行号: {', '.join(map(str, file_report['uncovered']))}") report_lines.append("\n") # 增量覆盖率总结 report_lines.append("=== 总结 ===") report_lines.append(f"总修改行数: {total_modified}") report_lines.append(f"总未被覆盖行数: {total_uncovered}") report_lines.append(f"增量覆盖率: {coverage_percent:.2f}%") report = "\n".join(report_lines) print(report) sys.stdout.flush() return { 'total_modified': total_modified, 'total_uncovered': total_uncovered, 'coverage_percent': coverage_percent, 'details': per_file_report } def main(): """主函数,执行所有步骤。""" parse_surefire_reports() # 假设 parse_jacoco() 已经正确返回了覆盖率数据字典 coverage_data = parse_jacoco() # 获取增量修改的文件及行号 repo = Repo(os.getcwd(), search_parent_directories=True) base_branch = "upstream/master" modified_files = get_modified_files(repo, base_branch, path_pattern='src/main/**/*.java') # 生成增量覆盖率报告 generate_incremental_coverage_report(modified_files, coverage_data) if __name__ == "__main__": main()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/baidubce/app-builder'

If you have feedback or need assistance with the MCP directory API, please join our Discord server