test_functions.py•43.5 kB
#!/usr/bin/env python3
"""
GitLab MCP Server 端到端测试脚本
这个脚本会实际调用GitLab API进行测试,请确保设置了正确的环境变量。
注意:运行此测试会在您的GitLab账号中创建真实的资源。
"""
import os
from dotenv import load_dotenv
import traceback
# 加载环境变量
load_dotenv()
from functions import (
create_repository,
fork_repository,
delete_repository,
search_repositories,
create_or_update_file,
push_files,
get_file_contents,
create_issue,
get_issues,
create_merge_request,
get_merge_request_diff,
create_branches,
delete_branches,
create_tags,
delete_tags
)
def test_create_repository():
"""测试创建仓库功能"""
# 创建一个测试项目
project_name = "mcp-test"
result = create_repository(
name=project_name,
description="GitLab MCP测试项目",
visibility="private",
initialize_with_readme=True
)
# 验证create_repository的输出字段
assert "status" in result, "创建仓库结果缺少status字段"
assert result["status"] == "success", f"创建仓库失败,状态: {result.get('status')}"
assert "message" in result, "创建仓库结果缺少message字段"
assert "id" in result, "创建仓库结果缺少id字段"
assert "name" in result, "创建仓库结果缺少name字段"
assert "web_url" in result, "创建仓库结果缺少web_url字段"
assert result["name"] == project_name, "返回的项目名称不匹配"
project_id = result["id"]
project_url = result["web_url"]
print(f"✅ 创建测试项目成功: {project_name} (ID: {project_id})")
print(f"项目URL: {project_url}")
print(f"返回消息: {result['message']}")
return project_id, project_name, project_url
def test_fork_repository():
"""测试Fork仓库功能"""
source_project_id = 1813
source_project_name = "decompose_task"
print(f"测试源项目: {source_project_name} (ID: {source_project_id})")
# Fork项目(不指定额外参数)
fork_result = fork_repository(source_project_id)
# 验证fork_repository的输出字段
assert "status" in fork_result, "Fork结果缺少status字段"
assert fork_result["status"] == "success", f"Fork失败,状态: {fork_result.get('status')}"
assert "message" in fork_result, "Fork结果缺少message字段"
assert "id" in fork_result, "Fork结果缺少id字段"
assert "name" in fork_result, "Fork结果缺少name字段"
assert "web_url" in fork_result, "Fork结果缺少web_url字段"
assert "forked_from_id" in fork_result, "Fork结果缺少forked_from_id字段"
assert fork_result["forked_from_id"] == source_project_id, "源项目ID不匹配"
forked_project_id = fork_result["id"]
forked_project_name = fork_result["name"]
forked_project_url = fork_result["web_url"]
print(f"✅ 成功Fork项目: {forked_project_name} (ID: {forked_project_id})")
print(f"Fork项目URL: {forked_project_url}")
# 测试fork不存在的项目
non_existent_id = "99999999"
error_result = fork_repository(non_existent_id)
# 验证错误处理
assert "status" in error_result, "错误结果格式不正确"
assert error_result["status"] == "error", f"Fork不存在项目应返回error状态,实际: {error_result.get('status')}"
assert "message" in error_result, "错误结果缺少错误信息"
print(f"✅ 错误处理验证成功: {error_result['message']}")
# 清理:删除创建的项目
print("\n=== 清理测试项目 ===")
# 删除fork项目
delete_fork = delete_repository(forked_project_id)
if delete_fork.get("status") == "success":
print(f"✅ 已删除Fork项目 {forked_project_name}")
print("✅✅✅ Fork仓库测试通过!")
return forked_project_id, forked_project_name, forked_project_url
def test_delete_repository():
"""测试删除仓库功能"""
# 先创建一个用于删除测试的项目
project_name = "mcp-test-delete"
create_result = create_repository(
name=project_name,
description="用于删除测试的项目",
visibility="private",
initialize_with_readme=True
)
assert create_result["status"] == "success", "创建测试项目失败"
project_id = create_result["id"]
project_url = create_result["web_url"]
print(f"创建用于删除测试的项目: {project_name} (ID: {project_id})")
print(f"项目URL: {project_url}")
# 删除刚创建的项目
delete_result = delete_repository(project_id)
# 验证delete_repository的输出字段
assert "status" in delete_result, "删除结果缺少status字段"
assert delete_result["status"] == "success", f"删除项目失败,状态: {delete_result.get('status')}"
assert "message" in delete_result, "删除结果缺少message字段"
print(f"✅ 成功删除项目: {delete_result['message']}")
# 测试删除不存在的项目
non_existent_id = "99999999"
error_result = delete_repository(non_existent_id)
# 验证错误处理
assert "status" in error_result, "错误结果格式不正确"
assert error_result["status"] == "error", f"删除不存在项目应返回error状态,实际: {error_result.get('status')}"
assert "message" in error_result, "错误结果缺少错误信息"
print(f"✅ 错误处理验证成功: {error_result['message']}")
print("✅✅✅ 删除仓库测试通过!")
return project_id, project_name, project_url
def test_search_repositories():
"""测试搜索仓库功能"""
search_term = "mcp-test" # 使用更通用的搜索词以确保能找到项目
result = search_repositories(search=search_term, page=1, per_page=100)
# 验证search_repositories的输出字段
assert "status" in result, "搜索结果缺少status字段"
assert result["status"] == "success", f"搜索失败,状态: {result.get('status')}"
assert "message" in result, "搜索结果缺少message字段"
assert "projects" in result, "搜索结果缺少projects字段"
assert isinstance(result["projects"], list), "projects字段应该是列表类型"
assert len(result["projects"]) > 0, "未找到任何项目"
# 验证项目列表中每个项目的字段
project = result["projects"][0]
assert "id" in project, "项目缺少id字段"
assert "name" in project, "项目缺少name字段"
assert "path" in project, "项目缺少path字段"
print(f"✅ 找到项目: {project['name']} (ID: {project['id']})")
print(f"搜索消息: {result['message']}")
return project["id"], project["name"], f"https://gitlab.com/{project['path']}"
def test_create_or_update_file():
project_id, project_name, project_url = test_search_repositories()
"""测试文件操作功能"""
# 1. 检查README文件内容
readme = get_file_contents(project_id, "README.md")
assert "content" in readme, "README.md 文件内容获取失败"
print("✅ 读取README.md成功")
branch="master"
# 2. 创建一个新文件
create_result = create_or_update_file(
project_id=project_id,
file_path="test.py",
content="print('Hello GitLab MCP!')",
commit_message="添加测试文件",
branch=branch
)
# 验证create_or_update_file的输出字段
assert "status" in create_result, "创建文件结果缺少status字段"
assert create_result["status"] == "success", f"创建文件失败,状态: {create_result.get('status')}"
assert "message" in create_result, "创建文件结果缺少message字段"
assert "file_path" in create_result, "创建文件结果缺少file_path字段"
assert create_result["file_path"] == "test.py", "返回的文件路径不匹配"
print(f"✅ 创建文件成功: {create_result['message']}")
# 3. 更新文件
update_result = create_or_update_file(
project_id=project_id,
file_path="test.py",
content="print('Updated content!')",
commit_message="更新测试文件",
branch=branch
)
assert update_result["status"] == "success", "更新文件失败"
print("✅ 更新文件成功")
# 4. 获取更新后的文件内容
updated_content = get_file_contents(project_id, "test.py")
assert updated_content["content"] == "print('Updated content!')", "文件内容更新验证失败"
print("✅ 文件内容验证成功")
return project_id, project_name, project_url
def test_push_files():
project_id, project_name, project_url = test_search_repositories()
"""测试批量推送文件功能"""
# 准备多个文件
files = [
{
"file_path": "src/gitlab_mcp_server.py",
"content": "# GitLab MCP Server\n\ndef main():\n print('Hello MCP Server')"
},
{
"file_path": "src/helper.py",
"content": "def helper_function():\n return 'Helper function'"
},
{
"file_path": "mcp_tool.json",
"content": "{\n \"name\": \"MCP Tool\",\n \"version\": \"1.0.0\"\n}"
}
]
branch="dev003"
ref_branch="master"
# 批量推送文件
result = push_files(
project_id=project_id,
files=files,
commit_message="添加多个文件",
branch=branch,
ref_branch=ref_branch # 使用一个不存在的分支来测试创建新分支
)
# 验证推送结果
assert result["status"] == "success", "批量推送文件失败"
assert "commit_id" in result, "未返回提交ID"
assert result["files_count"] == 3, f"文件数量不匹配,应为3,实际为{result['files_count']}"
print(f"✅ 批量推送 {result['files_count']} 个文件成功,提交ID: {result.get('commit_short_id')}")
# 验证文件是否真的被创建了
for file_data in files:
content = get_file_contents(project_id, file_data["file_path"], branch)
assert "content" in content, f"文件 {file_data['file_path']} 未创建成功"
assert content["content"] == file_data["content"], f"文件 {file_data['file_path']} 内容不匹配"
print("✅ 所有文件内容验证成功")
# 测试更新和删除操作
update_files = [
{
"file_path": "src/helper.py",
"content": "# 已更新\ndef main():\n print('Updated main function')",
"action": "update"
},
{
"file_path": "mcp_tool.json",
"action": "delete"
}
]
# 批量操作
update_result = push_files(
project_id=project_id,
files=update_files,
commit_message="更新和删除文件",
branch=branch,
ref_branch=branch # 使用同一分支进行更新和删除操作
)
# 验证push_files的输出字段
assert "status" in update_result, "批量更新文件结果缺少status字段"
assert update_result["status"] == "success", f"批量更新文件失败,状态: {update_result.get('status')}"
assert "message" in update_result, "批量更新文件结果缺少message字段"
assert "commit_id" in update_result, "批量更新文件结果缺少commit_id字段"
assert "commit_short_id" in update_result, "批量更新文件结果缺少commit_short_id字段"
assert "files_count" in update_result, "批量更新文件结果缺少files_count字段"
print(f"✅ 批量更新和删除文件成功: {update_result['message']}")
print(f"提交ID: {update_result['commit_short_id']}, 文件数: {update_result['files_count']}")
# 验证更新的文件
main_content = get_file_contents(project_id, "src/helper.py", branch)
assert "# 已更新" in main_content["content"], "文件更新验证失败"
print("✅ 文件更新验证成功")
# 验证删除的文件不存在
deleted_content = get_file_contents(project_id, "config.json", branch)
assert "error" in deleted_content, "文件删除验证失败"
print("✅ 文件删除验证成功")
return project_id, project_name, project_url
def test_get_file_contents():
"""测试获取文件内容功能"""
project_id, project_name, project_url = test_search_repositories()
# 获取README.md文件内容
readme = get_file_contents(project_id, "README.md")
# 验证get_file_contents的输出字段
assert "status" in readme, "获取文件内容结果缺少status字段"
assert readme["status"] == "success", f"获取文件内容失败,状态: {readme.get('status')}"
assert "message" in readme, "获取文件内容结果缺少message字段"
assert "content" in readme, "获取文件内容结果缺少content字段"
print(f"✅ 读取 {project_name} 的 README.md 成功: {readme['message']}")
print(f"文件内容预览: {readme['content'][:100]}...")
return project_id, project_name, project_url
def test_create_issue():
"""测试创建问题功能"""
project_id, project_name, project_url = test_search_repositories()
# 创建一个测试问题
issue_title = "测试问题test"
issue_description = "这是一个通过API创建的测试问题test"
issue_labels = ["测试test", "自动化test"]
issue = create_issue(
project_id=project_id,
title=issue_title,
description=issue_description,
labels=issue_labels
)
# 验证create_issue的输出字段
assert "status" in issue, "创建问题结果缺少status字段"
assert issue["status"] == "success", f"创建问题失败,状态: {issue.get('status')}"
assert "message" in issue, "创建问题结果缺少message字段"
assert "id" in issue, "创建问题结果缺少id字段"
assert "web_url" in issue, "创建问题结果缺少web_url字段"
assert "state" in issue, "创建问题结果缺少state字段"
print(f"✅ 创建问题成功: {issue['message']}")
print(f"问题详情 - ID: {issue['id']}, 状态: {issue['state']}, URL: {issue['web_url']}")
return project_id, project_name, project_url
def test_get_issues():
"""测试获取issues功能,参考test_create_issue函数实现"""
project_id, project_name, project_url = test_search_repositories()
print("\n=== 开始获取issues测试 ===")
# 1. 基础测试:获取所有issues
all_issues_result = get_issues(project_id=project_id)
# 验证基本返回结果
assert "status" in all_issues_result, "未返回状态信息"
assert all_issues_result["status"] == "success", f"获取issues失败,状态: {all_issues_result.get('status')}"
assert "issues" in all_issues_result, "未返回issues列表"
assert "total_count" in all_issues_result, "未返回总数信息"
assert "page" in all_issues_result, "未返回页码信息"
assert "per_page" in all_issues_result, "未返回每页数量信息"
issues_count = len(all_issues_result["issues"])
print(f"✅ 成功获取 {issues_count} 个issues")
# 验证issues的结构(如果有issues的话)
if issues_count > 0:
first_issue = all_issues_result["issues"][0]
# 验证issue的必要字段,参考create_issue的返回格式
assert "id" in first_issue, "issue缺少ID字段"
assert "title" in first_issue, "issue缺少标题字段"
assert "state" in first_issue, "issue缺少状态字段"
assert "web_url" in first_issue, "issue缺少URL字段"
assert "description" in first_issue, "issue缺少描述字段"
assert "labels" in first_issue, "issue缺少标签字段"
assert "author" in first_issue, "issue缺少作者字段"
assert "created_at" in first_issue, "issue缺少创建时间字段"
assert "updated_at" in first_issue, "issue缺少更新时间字段"
print(f"✅ 第一个issue验证成功: {first_issue['title']} (ID: {first_issue['id']}, 状态: {first_issue['state']})")
# 2. 测试状态过滤
print("\n=== 测试状态过滤 ===")
# 获取开放状态的issues
opened_issues_result = get_issues(project_id=project_id, state="opened")
assert opened_issues_result["status"] == "success", "获取开放issues失败"
opened_count = len(opened_issues_result["issues"])
print(f"✅ 获取开放状态issues: {opened_count} 个")
# 验证开放issues的状态
for issue in opened_issues_result["issues"]:
assert issue["state"] == "opened", f"状态过滤失败,期望: opened, 实际: {issue['state']}"
# 获取关闭状态的issues
closed_issues_result = get_issues(project_id=project_id, state="closed")
assert closed_issues_result["status"] == "success", "获取关闭issues失败"
closed_count = len(closed_issues_result["issues"])
print(f"✅ 获取关闭状态issues: {closed_count} 个")
# 验证关闭issues的状态
for issue in closed_issues_result["issues"]:
assert issue["state"] == "closed", f"状态过滤失败,期望: closed, 实际: {issue['state']}"
# 3. 测试标签过滤
print("\n=== 测试标签过滤 ===")
# 使用单个标签过滤
labeled_issues_result = get_issues(project_id=project_id, labels="测试test")
assert labeled_issues_result["status"] == "success", "标签过滤失败"
labeled_count = len(labeled_issues_result["issues"])
print(f"✅ 获取包含'测试test'标签的issues: {labeled_count} 个")
# 使用多个标签过滤
multi_labeled_issues_result = get_issues(project_id=project_id, labels=["测试test", "自动化test"])
assert multi_labeled_issues_result["status"] == "success", "多标签过滤失败"
multi_labeled_count = len(multi_labeled_issues_result["issues"])
print(f"✅ 获取包含多个标签的issues: {multi_labeled_count} 个")
# 4. 测试分页功能
print("\n=== 测试分页功能 ===")
# 测试第一页,每页2个
page1_result = get_issues(project_id=project_id, page=1, per_page=2)
assert page1_result["status"] == "success", "分页测试失败"
assert page1_result["page"] == 1, f"页码不匹配,期望: 1, 实际: {page1_result['page']}"
assert page1_result["per_page"] == 2, f"每页数量不匹配,期望: 2, 实际: {page1_result['per_page']}"
page1_count = len(page1_result["issues"])
print(f"✅ 分页测试成功 - 第1页: {page1_count} 个issues")
# 如果有足够的issues,测试第二页
if issues_count > 2:
page2_result = get_issues(project_id=project_id, page=2, per_page=2)
assert page2_result["status"] == "success", "第二页获取失败"
assert page2_result["page"] == 2, f"页码不匹配,期望: 2, 实际: {page2_result['page']}"
page2_count = len(page2_result["issues"])
print(f"✅ 第2页测试成功: {page2_count} 个issues")
# 5. 测试组合过滤条件
print("\n=== 测试组合过滤条件 ===")
combined_result = get_issues(
project_id=project_id,
state="opened",
labels="测试test",
page=1,
per_page=5
)
assert combined_result["status"] == "success", "组合过滤失败"
combined_count = len(combined_result["issues"])
print(f"✅ 组合过滤测试成功: 开放状态+测试标签 = {combined_count} 个issues")
# 验证组合过滤的结果
for issue in combined_result["issues"]:
assert issue["state"] == "opened", f"组合过滤状态验证失败,期望: opened, 实际: {issue['state']}"
# 6. 测试错误处理
print("\n=== 测试错误处理 ===")
# 使用无效的项目ID
invalid_project_result = get_issues(project_id="99999999")
assert invalid_project_result["status"] == "error", f"无效项目ID应该返回error状态,实际: {invalid_project_result['status']}"
assert "message" in invalid_project_result, "错误结果应该包含错误信息"
print(f"✅ 错误处理验证成功: {invalid_project_result['message']}")
# 7. 性能和数据完整性测试
print("\n=== 数据完整性验证 ===")
# 获取详细的issue信息并验证
if issues_count > 0:
detailed_result = get_issues(project_id=project_id, per_page=1)
if len(detailed_result["issues"]) > 0:
issue = detailed_result["issues"][0]
# 验证URL格式
assert "http" in issue["web_url"], f"URL格式可能不正确: {issue['web_url']}"
# 验证时间格式(应该包含日期)
assert len(issue["created_at"]) > 10, f"创建时间格式可能不正确: {issue['created_at']}"
# 验证作者不为空
assert issue["author"] != "", f"作者信息不应为空: {issue['author']}"
print(f"✅ 数据完整性验证成功")
print(f" - 标题: {issue['title']}")
print(f" - 作者: {issue['author']}")
print(f" - 创建时间: {issue['created_at']}")
print(f" - 标签数量: {len(issue['labels'])}")
print(f"\n✅✅✅ 所有获取issues测试通过!")
print(f"总结:")
print(f" - 总issues数量: {issues_count}")
print(f" - 开放状态: {opened_count}")
print(f" - 关闭状态: {closed_count}")
print(f" - 包含测试标签: {labeled_count}")
return project_id, project_name, project_url
def test_create_merge_request():
project_id, project_name, project_url = test_search_repositories()
branch_name = f"feature"
# 1. 创建一个新分支
file_result = create_or_update_file(
project_id=project_id,
file_path=f"feature-{branch_name}.txt",
content=f"This is a test file for branch {branch_name}",
commit_message=f"创建分支 {branch_name}",
branch=branch_name
)
assert file_result["status"] == "success", f"分支文件操作验证失败,状态: {file_result['status']}"
print(f"✅ 创建分支 {branch_name} 成功")
# 2. 创建合并请求
mr_title = f"添加功能 {branch_name}"
mr = create_merge_request(
project_id=project_id,
title=mr_title,
source_branch=branch_name,
target_branch="master",
description="这是一个测试合并请求",
draft=True
)
# 验证create_merge_request的输出字段
assert "status" in mr, "创建合并请求结果缺少status字段"
assert mr["status"] == "success", f"创建合并请求失败,状态: {mr.get('status')}"
assert "message" in mr, "创建合并请求结果缺少message字段"
assert "id" in mr, "创建合并请求结果缺少id字段"
assert "web_url" in mr, "创建合并请求结果缺少web_url字段"
assert "state" in mr, "创建合并请求结果缺少state字段"
print(f"✅ 创建合并请求成功: {mr['message']}")
print(f"合并请求详情 - ID: {mr['id']}, 状态: {mr['state']}, URL: {mr['web_url']}")
return project_id, project_name, project_url
def test_get_merge_request_diff():
"""测试获取合并请求差异功能"""
project_id, project_name, project_url = test_search_repositories()
branch_name = f"get-diff-test"
# 1. 创建一个新分支并添加一些变更
file_result = create_or_update_file(
project_id=project_id,
file_path=f"diff-test-{branch_name}.txt",
content=f"这是用于测试差异的文件内容\n分支: {branch_name}\n添加了一些测试内容",
commit_message=f"在分支 {branch_name} 上添加测试文件",
branch=branch_name
)
assert file_result["status"] == "success", f"分支文件操作验证,状态: {file_result['status']}"
print(f"✅ 创建分支 {branch_name} 和测试文件成功")
# 2. 创建合并请求
mr_title = f"测试差异获取 {branch_name}"
mr = create_merge_request(
project_id=project_id,
title=mr_title,
source_branch=branch_name,
target_branch="master",
description="这是一个用于测试获取差异的合并请求",
draft=True
)
assert "id" in mr, "合并请求创建失败"
assert "web_url" in mr, "合并请求URL获取失败"
merge_request_iid = mr["id"]
print(f"✅ 创建合并请求成功,ID: {merge_request_iid}, URL: {mr['web_url']}")
# 3. 获取合并请求差异
print(f"\n=== 开始获取合并请求差异测试 ===")
diff_result = get_merge_request_diff(
project_id=project_id,
merge_request_iid=merge_request_iid
)
# 验证基本返回结果
assert "status" in diff_result, "差异结果格式不正确"
assert diff_result["status"] == "success", f"获取合并请求差异失败,状态: {diff_result.get('status')}"
assert "diffs" in diff_result, "差异结果缺少diffs字段"
assert "merge_request_iid" in diff_result, "差异结果缺少merge_request_iid字段"
assert "project_id" in diff_result, "差异结果缺少project_id字段"
# 验证返回的ID正确性
assert diff_result["merge_request_iid"] == merge_request_iid, f"返回的合并请求ID不匹配,期望: {merge_request_iid}, 实际: {diff_result['merge_request_iid']}"
assert diff_result["project_id"] == project_id, f"返回的项目ID不匹配,期望: {project_id}, 实际: {diff_result['project_id']}"
# 验证diffs列表结构
diffs = diff_result["diffs"]
assert isinstance(diffs, list), "diffs应该是一个列表"
assert len(diffs) > 0, "应该有至少一个文件变更"
print(f"✅ 获取到 {len(diffs)} 个文件变更")
# 验证每个diff的结构
for i, diff in enumerate(diffs):
assert "old_path" in diff, f"第{i+1}个diff缺少old_path字段"
assert "new_path" in diff, f"第{i+1}个diff缺少new_path字段"
assert "diff_refs" in diff, f"第{i+1}个diff缺少diff_refs字段"
assert "diff" in diff, f"第{i+1}个diff缺少diff字段"
print(f" 📄 文件变更 {i+1}: {diff['new_path']}")
# 验证我们创建的测试文件是否在变更中
if diff['new_path'] == f"diff-test-{branch_name}.txt":
assert diff['new_path'] == f"diff-test-{branch_name}.txt", "文件路径不匹配"
print(f" ✅ 找到测试文件的变更: {diff['new_path']}")
print("✅ 所有差异结构验证成功")
# 4. 测试错误处理 - 无效项目ID
print("\n=== 测试错误处理 ===")
error_result = get_merge_request_diff(
project_id="99999999",
merge_request_iid=merge_request_iid
)
assert error_result["status"] == "error", f"无效项目ID应该返回error状态,实际: {error_result['status']}"
assert "message" in error_result, "错误结果应该包含错误信息"
print(f"✅ 无效项目ID错误处理验证成功: {error_result['message']}")
# 5. 测试错误处理 - 无效合并请求ID
invalid_mr_result = get_merge_request_diff(
project_id=project_id,
merge_request_iid=99999999
)
assert invalid_mr_result["status"] == "error", f"无效合并请求ID应该返回error状态,实际: {invalid_mr_result['status']}"
assert "message" in invalid_mr_result, "错误结果应该包含错误信息"
print(f"✅ 无效合并请求ID错误处理验证成功: {invalid_mr_result['message']}")
# 6. 数据完整性验证
print("\n=== 数据完整性验证 ===")
# 验证diff_refs结构
if len(diffs) > 0:
first_diff = diffs[0]
diff_refs = first_diff["diff_refs"]
# diff_refs应该包含base_sha和head_sha等信息
assert hasattr(diff_refs, '__dict__') or isinstance(diff_refs, dict), "diff_refs应该是一个对象或字典"
print(f"✅ diff_refs结构验证成功")
# 验证diff内容不为空(对于新增文件)
if first_diff["new_path"] == f"diff-test-{branch_name}.txt":
diff_content = first_diff["diff"]
assert diff_content is not None and len(diff_content) > 0, "diff内容不应为空"
assert "get-diff-test" in diff_content, "diff内容应包含文件变更"
print(f"✅ diff内容验证成功")
print(f"\n✅✅✅ 所有获取合并请求差异测试通过!")
print(f"测试总结:")
print(f" - 项目ID: {project_id}")
print(f" - 合并请求ID: {merge_request_iid}")
print(f" - 文件变更数: {len(diffs)}")
print(f" - 测试分支: {branch_name}")
return project_id, project_name, project_url
def test_create_branches():
"""测试分支创建功能"""
project_id, project_name, project_url = test_search_repositories()
# 准备测试数据(分支列表)
test_branches = [
{"branch_name": "feature", "ref_branch": "master"},
{"branch_name": "dev", "ref_branch": "master"}
]
# 批量执行操作并验证结果
for branch_data in test_branches:
result = create_branches(
project_id=project_id,
branch_name=branch_data["branch_name"],
ref_branch=branch_data["ref_branch"]
)
# 验证create_branches的输出字段
assert "status" in result, "创建分支结果缺少status字段"
assert result["status"] in ["success"], f"分支创建失败,状态: {result['status']}"
assert "message" in result, "创建分支结果缺少message字段"
assert "branch_name" in result, "创建分支结果缺少branch_name字段"
assert "commit" in result, "创建分支结果缺少commit字段"
# 验证分支名称正确性
assert result["branch_name"] == branch_data["branch_name"], f"分支名称不匹配,预期: {branch_data['branch_name']}, 实际: {result['branch_name']}"
# 验证重复创建处理
duplicate_result = create_branches(
project_id=project_id,
branch_name=branch_data["branch_name"],
ref_branch=branch_data["ref_branch"]
)
assert duplicate_result["status"] == "success", f"重复创建分支验证失败,预期: success, 实际: {duplicate_result['status']}"
# 在新分支上创建文件
file_result = create_or_update_file(
project_id=project_id,
file_path=f"branch-test-{branch_data['branch_name']}.txt",
content=f"这是在分支 {branch_data['branch_name']} 上创建的测试文件",
commit_message=f"在分支 {branch_data['branch_name']} 上添加测试文件",
branch=branch_data["branch_name"]
)
assert file_result["status"] == "success", f"分支文件操作验证失败,状态: {file_result['status']}"
# 验证文件内容
file_content = get_file_contents(project_id, f"branch-test-{branch_data['branch_name']}.txt", ref=branch_data["branch_name"])
assert file_content["content"] == f"这是在分支 {branch_data['branch_name']} 上创建的测试文件", f"分支文件内容验证失败,预期: 这是在分支 {branch_data['branch_name']} 上创建的测试文件,实际: {file_content['content']}"
print("✅ 批量创建/检查完成")
print("✅ 重复创建分支验证成功")
print("✅ 分支文件操作验证成功")
return project_id, project_name, project_url
def test_delete_branches():
project_id, project_name, project_url = test_search_repositories()
# 准备测试数据:先创建一些分支用于删除测试
delete_branches_info = ["feature", "nonexistent-branch", "dev003"]
print(f"\n=== 开始删除分支测试 ===")
# 测试删除存在的分支
deleted_branches = []
for branch_name in delete_branches_info:
result = delete_branches(
project_id=project_id,
branch_name=branch_name
)
# 验证delete_branches的输出字段
assert "status" in result, "删除分支结果缺少status字段"
assert result["status"] in ["success", "error"], f"删除分支状态异常: {result.get('status')}"
assert "message" in result, "删除分支结果缺少message字段"
if result["status"] == "success":
deleted_branches.append(branch_name)
print(f"✅ 成功删除分支: {branch_name} - {result['message']}")
else:
print(f"⚠️ 删除分支失败: {branch_name} - {result['message']}")
print(f"✅ 批量删除 {len(deleted_branches)} 个分支完成")
# 测试删除不存在的分支
print("\n=== 测试删除不存在的分支 ===")
non_existent_branch = "non-existent-branch-12345"
not_found_result = delete_branches(
project_id=project_id,
branch_name=non_existent_branch
)
# 验证不存在分支的处理
assert not_found_result["status"] == "success", f"删除不存在分支应该返回success状态,实际: {not_found_result['status']}"
assert "message" in not_found_result, "错误结果应该包含消息"
assert non_existent_branch in not_found_result["message"], "错误消息应该包含分支名称"
print(f"✅ 删除不存在分支验证成功: {not_found_result['message']}")
# 测试删除默认分支(应该失败)
print("\n=== 测试删除默认分支 ===")
# 获取项目的默认分支
from src.gitlab_mcp_server import get_gitlab_client
gl = get_gitlab_client()
project = gl.projects.get(project_id)
default_branch = project.default_branch
default_branch_result = delete_branches(
project_id=project_id,
branch_name=default_branch
)
# 验证默认分支删除保护
assert default_branch_result["status"] == "error", f"删除默认分支应该返回error状态,实际: {default_branch_result['status']}"
assert "message" in default_branch_result, "错误结果应该包含错误信息"
assert "默认分支" in default_branch_result["message"] or "default branch" in default_branch_result["message"], "错误消息应该提到默认分支"
print(f"✅ 默认分支保护验证成功: {default_branch_result['message']}")
# 测试重复删除同一个分支(应该返回not_found)
print("\n=== 测试重复删除分支 ===")
if deleted_branches:
duplicate_delete_result = delete_branches(
project_id=project_id,
branch_name=deleted_branches[0]
)
# 验证重复删除的结果
assert duplicate_delete_result["status"] == "success", f"重复删除分支应该返回success状态,实际: {duplicate_delete_result['status']}"
print(f"✅ 重复删除分支验证成功: {duplicate_delete_result['message']}")
print(f"\n✅✅✅ 所有删除分支测试通过!")
return project_id, project_name, project_url
def test_create_tags():
"""测试标签创建功能"""
project_id, project_name, project_url = test_search_repositories()
# 准备测试数据(标签列表)
test_tags = [
{"tag_name": "v1.0.0", "ref": "master", "message": "第一个版本发布"},
{"tag_name": "v1.1.0", "ref": "master", "message": "功能更新版本"}
]
print(f"\n=== 开始创建标签测试 ===")
# 批量执行操作并验证结果
for tag_data in test_tags:
result = create_tags(
project_id=project_id,
tag_name=tag_data["tag_name"],
ref_branch=tag_data["ref"],
message=tag_data["message"]
)
# 验证create_tags的输出字段
assert "status" in result, "创建标签结果缺少status字段"
assert result["status"] == "success", f"标签创建失败,状态: {result['status']}"
assert "message" in result, "创建标签结果缺少message字段"
assert "tag_name" in result, "创建标签结果缺少tag_name字段"
assert "commit" in result, "创建标签结果缺少commit字段"
# 验证标签名称正确性
assert result["tag_name"] == tag_data["tag_name"], f"标签名称不匹配,预期: {tag_data['tag_name']}, 实际: {result['tag_name']}"
print(f"✅ 创建标签: {result['tag_name']} - 状态: {result['status']}")
print(f" 消息: {result['message']}")
# 验证重复创建处理
duplicate_result = create_tags(
project_id=project_id,
tag_name=tag_data["tag_name"],
ref_branch=tag_data["ref"],
message=tag_data["message"]
)
assert duplicate_result["status"] == "success", f"重复创建标签验证失败,预期: success, 实际: {duplicate_result['status']}"
# 测试创建无消息的标签
simple_tag_result = create_tags(
project_id=project_id,
tag_name="v0.9.0",
ref_branch="master"
)
assert simple_tag_result["status"] == "success", f"创建简单标签失败,状态: {simple_tag_result['status']}"
print(f"✅ 创建简单标签: {simple_tag_result['tag_name']} - 状态: {simple_tag_result['status']}")
# 测试错误处理 - 无效项目ID
print("\n=== 测试错误处理 ===")
error_result = create_tags(
project_id="99999999",
tag_name="v2.0.0",
ref_branch="master",
message="测试标签"
)
assert error_result["status"] == "error", f"无效项目ID应该返回error状态,实际: {error_result['status']}"
assert "message" in error_result, "错误结果应该包含错误信息"
print(f"✅ 错误处理验证成功: {error_result['message']}")
print("✅ 批量创建/检查标签完成")
print("✅ 重复创建标签验证成功")
print("✅ 简单标签创建验证成功")
print("✅✅✅ 所有创建标签测试通过!")
return project_id, project_name, project_url
def test_delete_tags():
"""测试标签删除功能"""
project_id, project_name, project_url = test_search_repositories()
# 准备测试数据:先创建一些标签用于删除测试
test_tags_for_deletion = ["v1.0.0", "v1.1.0", "v0.9.0"]
print(f"\n=== 开始删除标签测试 ===")
# 测试删除存在的标签
deleted_tags = []
for tag_name in test_tags_for_deletion:
result = delete_tags(
project_id=project_id,
tag_name=tag_name
)
# 验证delete_tags的输出字段
assert "status" in result, "删除标签结果缺少status字段"
assert result["status"] in ["success", "error"], f"标签 {tag_name} 删除结果状态异常: {result.get('status')}"
assert "message" in result, "删除标签结果缺少message字段"
if result["status"] == "success":
deleted_tags.append(tag_name)
print(f"✅ 成功删除标签: {tag_name} - {result['message']}")
else:
print(f"ℹ️ 删除标签失败或不存在: {tag_name} - {result['message']}")
print(f"✅ 批量删除操作完成,实际删除 {len(deleted_tags)} 个标签")
# 测试删除不存在的标签
print("\n=== 测试删除不存在的标签 ===")
non_existent_tag = "non-existent-tag-12345"
not_found_result = delete_tags(
project_id=project_id,
tag_name=non_existent_tag
)
# 验证不存在标签的处理
assert not_found_result["status"] == "success", f"删除不存在标签应该返回success状态,实际: {not_found_result['status']}"
assert "message" in not_found_result, "错误结果应该包含消息"
assert non_existent_tag in not_found_result["message"], "错误消息应该包含标签名称"
print(f"✅ 删除不存在标签验证成功: {not_found_result['message']}")
# 测试重复删除同一个标签(应该返回not_found)
print("\n=== 测试重复删除标签 ===")
if deleted_tags:
duplicate_delete_result = delete_tags(
project_id=project_id,
tag_name=deleted_tags[0]
)
# 验证重复删除的结果
assert duplicate_delete_result["status"] == "success", f"重复删除标签应该返回success状态,实际: {duplicate_delete_result['status']}"
print(f"✅ 重复删除标签验证成功: {duplicate_delete_result['message']}")
# 测试错误处理 - 无效项目ID
print("\n=== 测试错误处理 ===")
error_result = delete_tags(
project_id="99999999",
tag_name="v2.0.0"
)
assert error_result["status"] == "error", f"无效项目ID应该返回error状态,实际: {error_result['status']}"
assert "message" in error_result, "错误结果应该包含错误信息"
print(f"✅ 错误处理验证成功: {error_result['message']}")
print(f"\n✅✅✅ 所有删除标签测试通过!")
return project_id, project_name, project_url
def run_all_tests():
"""运行所有测试"""
try:
# 检查环境变量
if not os.getenv("GITLAB_TOKEN"):
print("错误: 缺少GITLAB_TOKEN环境变量,无法执行测试")
print("请设置环境变量后重试: export GITLAB_TOKEN=your_token")
return False
# project_id, project_name, project_url = test_create_repository()
# project_id, project_name, project_url = test_fork_repository()
project_id, project_name, project_url = test_delete_repository()
# project_id, project_name, project_url = test_search_repositories()
# project_id, project_name, project_url = test_create_or_update_file()
# project_id, project_name, project_url = test_push_files()
# project_id, project_name, project_url = test_get_file_contents()
# project_id, project_name, project_url = test_create_issue()
# project_id, project_name, project_url = test_get_issues()
# project_id, project_name, project_url = test_create_merge_request()
# project_id, project_name, project_url = test_get_merge_request_diff()
# project_id, project_name, project_url = test_create_branches()
# project_id, project_name, project_url = test_delete_branches()
# project_id, project_name, project_url = test_create_tags()
# project_id, project_name, project_url = test_delete_tags()
print("\n✅✅✅ 所有测试通过!")
print(f"注意:测试项目 {project_name} (ID: {project_id}) 请手动删除")
print(f"项目URL: {project_url}")
return True
except Exception as e:
print(f"\n❌ 测试失败: {e}")
traceback.print_exc()
return False
if __name__ == "__main__":
print("开始GitLab MCP Server端到端测试...")
print("警告:此测试会在您的GitLab账号中创建真实的项目和资源!")
run_all_tests()