Skip to main content
Glama

GitLab MCP Server

by lucky-dersan
test_functions.py43.5 kB
#!/usr/bin/env python3 """ GitLab MCP Server 端到端测试脚本 这个脚本会实际调用GitLab API进行测试,请确保设置了正确的环境变量。 注意:运行此测试会在您的GitLab账号中创建真实的资源。 """ import os from dotenv import load_dotenv import traceback # 加载环境变量 load_dotenv() from functions import ( create_repository, fork_repository, delete_repository, search_repositories, create_or_update_file, push_files, get_file_contents, create_issue, get_issues, create_merge_request, get_merge_request_diff, create_branches, delete_branches, create_tags, delete_tags ) def test_create_repository(): """测试创建仓库功能""" # 创建一个测试项目 project_name = "mcp-test" result = create_repository( name=project_name, description="GitLab MCP测试项目", visibility="private", initialize_with_readme=True ) # 验证create_repository的输出字段 assert "status" in result, "创建仓库结果缺少status字段" assert result["status"] == "success", f"创建仓库失败,状态: {result.get('status')}" assert "message" in result, "创建仓库结果缺少message字段" assert "id" in result, "创建仓库结果缺少id字段" assert "name" in result, "创建仓库结果缺少name字段" assert "web_url" in result, "创建仓库结果缺少web_url字段" assert result["name"] == project_name, "返回的项目名称不匹配" project_id = result["id"] project_url = result["web_url"] print(f"✅ 创建测试项目成功: {project_name} (ID: {project_id})") print(f"项目URL: {project_url}") print(f"返回消息: {result['message']}") return project_id, project_name, project_url def test_fork_repository(): """测试Fork仓库功能""" source_project_id = 1813 source_project_name = "decompose_task" print(f"测试源项目: {source_project_name} (ID: {source_project_id})") # Fork项目(不指定额外参数) fork_result = fork_repository(source_project_id) # 验证fork_repository的输出字段 assert "status" in fork_result, "Fork结果缺少status字段" assert fork_result["status"] == "success", f"Fork失败,状态: {fork_result.get('status')}" assert "message" in fork_result, "Fork结果缺少message字段" assert "id" in fork_result, "Fork结果缺少id字段" assert "name" in fork_result, "Fork结果缺少name字段" assert "web_url" in fork_result, "Fork结果缺少web_url字段" assert "forked_from_id" in fork_result, "Fork结果缺少forked_from_id字段" assert fork_result["forked_from_id"] == source_project_id, "源项目ID不匹配" forked_project_id = fork_result["id"] forked_project_name = fork_result["name"] forked_project_url = fork_result["web_url"] print(f"✅ 成功Fork项目: {forked_project_name} (ID: {forked_project_id})") print(f"Fork项目URL: {forked_project_url}") # 测试fork不存在的项目 non_existent_id = "99999999" error_result = fork_repository(non_existent_id) # 验证错误处理 assert "status" in error_result, "错误结果格式不正确" assert error_result["status"] == "error", f"Fork不存在项目应返回error状态,实际: {error_result.get('status')}" assert "message" in error_result, "错误结果缺少错误信息" print(f"✅ 错误处理验证成功: {error_result['message']}") # 清理:删除创建的项目 print("\n=== 清理测试项目 ===") # 删除fork项目 delete_fork = delete_repository(forked_project_id) if delete_fork.get("status") == "success": print(f"✅ 已删除Fork项目 {forked_project_name}") print("✅✅✅ Fork仓库测试通过!") return forked_project_id, forked_project_name, forked_project_url def test_delete_repository(): """测试删除仓库功能""" # 先创建一个用于删除测试的项目 project_name = "mcp-test-delete" create_result = create_repository( name=project_name, description="用于删除测试的项目", visibility="private", initialize_with_readme=True ) assert create_result["status"] == "success", "创建测试项目失败" project_id = create_result["id"] project_url = create_result["web_url"] print(f"创建用于删除测试的项目: {project_name} (ID: {project_id})") print(f"项目URL: {project_url}") # 删除刚创建的项目 delete_result = delete_repository(project_id) # 验证delete_repository的输出字段 assert "status" in delete_result, "删除结果缺少status字段" assert delete_result["status"] == "success", f"删除项目失败,状态: {delete_result.get('status')}" assert "message" in delete_result, "删除结果缺少message字段" print(f"✅ 成功删除项目: {delete_result['message']}") # 测试删除不存在的项目 non_existent_id = "99999999" error_result = delete_repository(non_existent_id) # 验证错误处理 assert "status" in error_result, "错误结果格式不正确" assert error_result["status"] == "error", f"删除不存在项目应返回error状态,实际: {error_result.get('status')}" assert "message" in error_result, "错误结果缺少错误信息" print(f"✅ 错误处理验证成功: {error_result['message']}") print("✅✅✅ 删除仓库测试通过!") return project_id, project_name, project_url def test_search_repositories(): """测试搜索仓库功能""" search_term = "mcp-test" # 使用更通用的搜索词以确保能找到项目 result = search_repositories(search=search_term, page=1, per_page=100) # 验证search_repositories的输出字段 assert "status" in result, "搜索结果缺少status字段" assert result["status"] == "success", f"搜索失败,状态: {result.get('status')}" assert "message" in result, "搜索结果缺少message字段" assert "projects" in result, "搜索结果缺少projects字段" assert isinstance(result["projects"], list), "projects字段应该是列表类型" assert len(result["projects"]) > 0, "未找到任何项目" # 验证项目列表中每个项目的字段 project = result["projects"][0] assert "id" in project, "项目缺少id字段" assert "name" in project, "项目缺少name字段" assert "path" in project, "项目缺少path字段" print(f"✅ 找到项目: {project['name']} (ID: {project['id']})") print(f"搜索消息: {result['message']}") return project["id"], project["name"], f"https://gitlab.com/{project['path']}" def test_create_or_update_file(): project_id, project_name, project_url = test_search_repositories() """测试文件操作功能""" # 1. 检查README文件内容 readme = get_file_contents(project_id, "README.md") assert "content" in readme, "README.md 文件内容获取失败" print("✅ 读取README.md成功") branch="master" # 2. 创建一个新文件 create_result = create_or_update_file( project_id=project_id, file_path="test.py", content="print('Hello GitLab MCP!')", commit_message="添加测试文件", branch=branch ) # 验证create_or_update_file的输出字段 assert "status" in create_result, "创建文件结果缺少status字段" assert create_result["status"] == "success", f"创建文件失败,状态: {create_result.get('status')}" assert "message" in create_result, "创建文件结果缺少message字段" assert "file_path" in create_result, "创建文件结果缺少file_path字段" assert create_result["file_path"] == "test.py", "返回的文件路径不匹配" print(f"✅ 创建文件成功: {create_result['message']}") # 3. 更新文件 update_result = create_or_update_file( project_id=project_id, file_path="test.py", content="print('Updated content!')", commit_message="更新测试文件", branch=branch ) assert update_result["status"] == "success", "更新文件失败" print("✅ 更新文件成功") # 4. 获取更新后的文件内容 updated_content = get_file_contents(project_id, "test.py") assert updated_content["content"] == "print('Updated content!')", "文件内容更新验证失败" print("✅ 文件内容验证成功") return project_id, project_name, project_url def test_push_files(): project_id, project_name, project_url = test_search_repositories() """测试批量推送文件功能""" # 准备多个文件 files = [ { "file_path": "src/gitlab_mcp_server.py", "content": "# GitLab MCP Server\n\ndef main():\n print('Hello MCP Server')" }, { "file_path": "src/helper.py", "content": "def helper_function():\n return 'Helper function'" }, { "file_path": "mcp_tool.json", "content": "{\n \"name\": \"MCP Tool\",\n \"version\": \"1.0.0\"\n}" } ] branch="dev003" ref_branch="master" # 批量推送文件 result = push_files( project_id=project_id, files=files, commit_message="添加多个文件", branch=branch, ref_branch=ref_branch # 使用一个不存在的分支来测试创建新分支 ) # 验证推送结果 assert result["status"] == "success", "批量推送文件失败" assert "commit_id" in result, "未返回提交ID" assert result["files_count"] == 3, f"文件数量不匹配,应为3,实际为{result['files_count']}" print(f"✅ 批量推送 {result['files_count']} 个文件成功,提交ID: {result.get('commit_short_id')}") # 验证文件是否真的被创建了 for file_data in files: content = get_file_contents(project_id, file_data["file_path"], branch) assert "content" in content, f"文件 {file_data['file_path']} 未创建成功" assert content["content"] == file_data["content"], f"文件 {file_data['file_path']} 内容不匹配" print("✅ 所有文件内容验证成功") # 测试更新和删除操作 update_files = [ { "file_path": "src/helper.py", "content": "# 已更新\ndef main():\n print('Updated main function')", "action": "update" }, { "file_path": "mcp_tool.json", "action": "delete" } ] # 批量操作 update_result = push_files( project_id=project_id, files=update_files, commit_message="更新和删除文件", branch=branch, ref_branch=branch # 使用同一分支进行更新和删除操作 ) # 验证push_files的输出字段 assert "status" in update_result, "批量更新文件结果缺少status字段" assert update_result["status"] == "success", f"批量更新文件失败,状态: {update_result.get('status')}" assert "message" in update_result, "批量更新文件结果缺少message字段" assert "commit_id" in update_result, "批量更新文件结果缺少commit_id字段" assert "commit_short_id" in update_result, "批量更新文件结果缺少commit_short_id字段" assert "files_count" in update_result, "批量更新文件结果缺少files_count字段" print(f"✅ 批量更新和删除文件成功: {update_result['message']}") print(f"提交ID: {update_result['commit_short_id']}, 文件数: {update_result['files_count']}") # 验证更新的文件 main_content = get_file_contents(project_id, "src/helper.py", branch) assert "# 已更新" in main_content["content"], "文件更新验证失败" print("✅ 文件更新验证成功") # 验证删除的文件不存在 deleted_content = get_file_contents(project_id, "config.json", branch) assert "error" in deleted_content, "文件删除验证失败" print("✅ 文件删除验证成功") return project_id, project_name, project_url def test_get_file_contents(): """测试获取文件内容功能""" project_id, project_name, project_url = test_search_repositories() # 获取README.md文件内容 readme = get_file_contents(project_id, "README.md") # 验证get_file_contents的输出字段 assert "status" in readme, "获取文件内容结果缺少status字段" assert readme["status"] == "success", f"获取文件内容失败,状态: {readme.get('status')}" assert "message" in readme, "获取文件内容结果缺少message字段" assert "content" in readme, "获取文件内容结果缺少content字段" print(f"✅ 读取 {project_name} 的 README.md 成功: {readme['message']}") print(f"文件内容预览: {readme['content'][:100]}...") return project_id, project_name, project_url def test_create_issue(): """测试创建问题功能""" project_id, project_name, project_url = test_search_repositories() # 创建一个测试问题 issue_title = "测试问题test" issue_description = "这是一个通过API创建的测试问题test" issue_labels = ["测试test", "自动化test"] issue = create_issue( project_id=project_id, title=issue_title, description=issue_description, labels=issue_labels ) # 验证create_issue的输出字段 assert "status" in issue, "创建问题结果缺少status字段" assert issue["status"] == "success", f"创建问题失败,状态: {issue.get('status')}" assert "message" in issue, "创建问题结果缺少message字段" assert "id" in issue, "创建问题结果缺少id字段" assert "web_url" in issue, "创建问题结果缺少web_url字段" assert "state" in issue, "创建问题结果缺少state字段" print(f"✅ 创建问题成功: {issue['message']}") print(f"问题详情 - ID: {issue['id']}, 状态: {issue['state']}, URL: {issue['web_url']}") return project_id, project_name, project_url def test_get_issues(): """测试获取issues功能,参考test_create_issue函数实现""" project_id, project_name, project_url = test_search_repositories() print("\n=== 开始获取issues测试 ===") # 1. 基础测试:获取所有issues all_issues_result = get_issues(project_id=project_id) # 验证基本返回结果 assert "status" in all_issues_result, "未返回状态信息" assert all_issues_result["status"] == "success", f"获取issues失败,状态: {all_issues_result.get('status')}" assert "issues" in all_issues_result, "未返回issues列表" assert "total_count" in all_issues_result, "未返回总数信息" assert "page" in all_issues_result, "未返回页码信息" assert "per_page" in all_issues_result, "未返回每页数量信息" issues_count = len(all_issues_result["issues"]) print(f"✅ 成功获取 {issues_count} 个issues") # 验证issues的结构(如果有issues的话) if issues_count > 0: first_issue = all_issues_result["issues"][0] # 验证issue的必要字段,参考create_issue的返回格式 assert "id" in first_issue, "issue缺少ID字段" assert "title" in first_issue, "issue缺少标题字段" assert "state" in first_issue, "issue缺少状态字段" assert "web_url" in first_issue, "issue缺少URL字段" assert "description" in first_issue, "issue缺少描述字段" assert "labels" in first_issue, "issue缺少标签字段" assert "author" in first_issue, "issue缺少作者字段" assert "created_at" in first_issue, "issue缺少创建时间字段" assert "updated_at" in first_issue, "issue缺少更新时间字段" print(f"✅ 第一个issue验证成功: {first_issue['title']} (ID: {first_issue['id']}, 状态: {first_issue['state']})") # 2. 测试状态过滤 print("\n=== 测试状态过滤 ===") # 获取开放状态的issues opened_issues_result = get_issues(project_id=project_id, state="opened") assert opened_issues_result["status"] == "success", "获取开放issues失败" opened_count = len(opened_issues_result["issues"]) print(f"✅ 获取开放状态issues: {opened_count} 个") # 验证开放issues的状态 for issue in opened_issues_result["issues"]: assert issue["state"] == "opened", f"状态过滤失败,期望: opened, 实际: {issue['state']}" # 获取关闭状态的issues closed_issues_result = get_issues(project_id=project_id, state="closed") assert closed_issues_result["status"] == "success", "获取关闭issues失败" closed_count = len(closed_issues_result["issues"]) print(f"✅ 获取关闭状态issues: {closed_count} 个") # 验证关闭issues的状态 for issue in closed_issues_result["issues"]: assert issue["state"] == "closed", f"状态过滤失败,期望: closed, 实际: {issue['state']}" # 3. 测试标签过滤 print("\n=== 测试标签过滤 ===") # 使用单个标签过滤 labeled_issues_result = get_issues(project_id=project_id, labels="测试test") assert labeled_issues_result["status"] == "success", "标签过滤失败" labeled_count = len(labeled_issues_result["issues"]) print(f"✅ 获取包含'测试test'标签的issues: {labeled_count} 个") # 使用多个标签过滤 multi_labeled_issues_result = get_issues(project_id=project_id, labels=["测试test", "自动化test"]) assert multi_labeled_issues_result["status"] == "success", "多标签过滤失败" multi_labeled_count = len(multi_labeled_issues_result["issues"]) print(f"✅ 获取包含多个标签的issues: {multi_labeled_count} 个") # 4. 测试分页功能 print("\n=== 测试分页功能 ===") # 测试第一页,每页2个 page1_result = get_issues(project_id=project_id, page=1, per_page=2) assert page1_result["status"] == "success", "分页测试失败" assert page1_result["page"] == 1, f"页码不匹配,期望: 1, 实际: {page1_result['page']}" assert page1_result["per_page"] == 2, f"每页数量不匹配,期望: 2, 实际: {page1_result['per_page']}" page1_count = len(page1_result["issues"]) print(f"✅ 分页测试成功 - 第1页: {page1_count} 个issues") # 如果有足够的issues,测试第二页 if issues_count > 2: page2_result = get_issues(project_id=project_id, page=2, per_page=2) assert page2_result["status"] == "success", "第二页获取失败" assert page2_result["page"] == 2, f"页码不匹配,期望: 2, 实际: {page2_result['page']}" page2_count = len(page2_result["issues"]) print(f"✅ 第2页测试成功: {page2_count} 个issues") # 5. 测试组合过滤条件 print("\n=== 测试组合过滤条件 ===") combined_result = get_issues( project_id=project_id, state="opened", labels="测试test", page=1, per_page=5 ) assert combined_result["status"] == "success", "组合过滤失败" combined_count = len(combined_result["issues"]) print(f"✅ 组合过滤测试成功: 开放状态+测试标签 = {combined_count} 个issues") # 验证组合过滤的结果 for issue in combined_result["issues"]: assert issue["state"] == "opened", f"组合过滤状态验证失败,期望: opened, 实际: {issue['state']}" # 6. 测试错误处理 print("\n=== 测试错误处理 ===") # 使用无效的项目ID invalid_project_result = get_issues(project_id="99999999") assert invalid_project_result["status"] == "error", f"无效项目ID应该返回error状态,实际: {invalid_project_result['status']}" assert "message" in invalid_project_result, "错误结果应该包含错误信息" print(f"✅ 错误处理验证成功: {invalid_project_result['message']}") # 7. 性能和数据完整性测试 print("\n=== 数据完整性验证 ===") # 获取详细的issue信息并验证 if issues_count > 0: detailed_result = get_issues(project_id=project_id, per_page=1) if len(detailed_result["issues"]) > 0: issue = detailed_result["issues"][0] # 验证URL格式 assert "http" in issue["web_url"], f"URL格式可能不正确: {issue['web_url']}" # 验证时间格式(应该包含日期) assert len(issue["created_at"]) > 10, f"创建时间格式可能不正确: {issue['created_at']}" # 验证作者不为空 assert issue["author"] != "", f"作者信息不应为空: {issue['author']}" print(f"✅ 数据完整性验证成功") print(f" - 标题: {issue['title']}") print(f" - 作者: {issue['author']}") print(f" - 创建时间: {issue['created_at']}") print(f" - 标签数量: {len(issue['labels'])}") print(f"\n✅✅✅ 所有获取issues测试通过!") print(f"总结:") print(f" - 总issues数量: {issues_count}") print(f" - 开放状态: {opened_count}") print(f" - 关闭状态: {closed_count}") print(f" - 包含测试标签: {labeled_count}") return project_id, project_name, project_url def test_create_merge_request(): project_id, project_name, project_url = test_search_repositories() branch_name = f"feature" # 1. 创建一个新分支 file_result = create_or_update_file( project_id=project_id, file_path=f"feature-{branch_name}.txt", content=f"This is a test file for branch {branch_name}", commit_message=f"创建分支 {branch_name}", branch=branch_name ) assert file_result["status"] == "success", f"分支文件操作验证失败,状态: {file_result['status']}" print(f"✅ 创建分支 {branch_name} 成功") # 2. 创建合并请求 mr_title = f"添加功能 {branch_name}" mr = create_merge_request( project_id=project_id, title=mr_title, source_branch=branch_name, target_branch="master", description="这是一个测试合并请求", draft=True ) # 验证create_merge_request的输出字段 assert "status" in mr, "创建合并请求结果缺少status字段" assert mr["status"] == "success", f"创建合并请求失败,状态: {mr.get('status')}" assert "message" in mr, "创建合并请求结果缺少message字段" assert "id" in mr, "创建合并请求结果缺少id字段" assert "web_url" in mr, "创建合并请求结果缺少web_url字段" assert "state" in mr, "创建合并请求结果缺少state字段" print(f"✅ 创建合并请求成功: {mr['message']}") print(f"合并请求详情 - ID: {mr['id']}, 状态: {mr['state']}, URL: {mr['web_url']}") return project_id, project_name, project_url def test_get_merge_request_diff(): """测试获取合并请求差异功能""" project_id, project_name, project_url = test_search_repositories() branch_name = f"get-diff-test" # 1. 创建一个新分支并添加一些变更 file_result = create_or_update_file( project_id=project_id, file_path=f"diff-test-{branch_name}.txt", content=f"这是用于测试差异的文件内容\n分支: {branch_name}\n添加了一些测试内容", commit_message=f"在分支 {branch_name} 上添加测试文件", branch=branch_name ) assert file_result["status"] == "success", f"分支文件操作验证,状态: {file_result['status']}" print(f"✅ 创建分支 {branch_name} 和测试文件成功") # 2. 创建合并请求 mr_title = f"测试差异获取 {branch_name}" mr = create_merge_request( project_id=project_id, title=mr_title, source_branch=branch_name, target_branch="master", description="这是一个用于测试获取差异的合并请求", draft=True ) assert "id" in mr, "合并请求创建失败" assert "web_url" in mr, "合并请求URL获取失败" merge_request_iid = mr["id"] print(f"✅ 创建合并请求成功,ID: {merge_request_iid}, URL: {mr['web_url']}") # 3. 获取合并请求差异 print(f"\n=== 开始获取合并请求差异测试 ===") diff_result = get_merge_request_diff( project_id=project_id, merge_request_iid=merge_request_iid ) # 验证基本返回结果 assert "status" in diff_result, "差异结果格式不正确" assert diff_result["status"] == "success", f"获取合并请求差异失败,状态: {diff_result.get('status')}" assert "diffs" in diff_result, "差异结果缺少diffs字段" assert "merge_request_iid" in diff_result, "差异结果缺少merge_request_iid字段" assert "project_id" in diff_result, "差异结果缺少project_id字段" # 验证返回的ID正确性 assert diff_result["merge_request_iid"] == merge_request_iid, f"返回的合并请求ID不匹配,期望: {merge_request_iid}, 实际: {diff_result['merge_request_iid']}" assert diff_result["project_id"] == project_id, f"返回的项目ID不匹配,期望: {project_id}, 实际: {diff_result['project_id']}" # 验证diffs列表结构 diffs = diff_result["diffs"] assert isinstance(diffs, list), "diffs应该是一个列表" assert len(diffs) > 0, "应该有至少一个文件变更" print(f"✅ 获取到 {len(diffs)} 个文件变更") # 验证每个diff的结构 for i, diff in enumerate(diffs): assert "old_path" in diff, f"第{i+1}个diff缺少old_path字段" assert "new_path" in diff, f"第{i+1}个diff缺少new_path字段" assert "diff_refs" in diff, f"第{i+1}个diff缺少diff_refs字段" assert "diff" in diff, f"第{i+1}个diff缺少diff字段" print(f" 📄 文件变更 {i+1}: {diff['new_path']}") # 验证我们创建的测试文件是否在变更中 if diff['new_path'] == f"diff-test-{branch_name}.txt": assert diff['new_path'] == f"diff-test-{branch_name}.txt", "文件路径不匹配" print(f" ✅ 找到测试文件的变更: {diff['new_path']}") print("✅ 所有差异结构验证成功") # 4. 测试错误处理 - 无效项目ID print("\n=== 测试错误处理 ===") error_result = get_merge_request_diff( project_id="99999999", merge_request_iid=merge_request_iid ) assert error_result["status"] == "error", f"无效项目ID应该返回error状态,实际: {error_result['status']}" assert "message" in error_result, "错误结果应该包含错误信息" print(f"✅ 无效项目ID错误处理验证成功: {error_result['message']}") # 5. 测试错误处理 - 无效合并请求ID invalid_mr_result = get_merge_request_diff( project_id=project_id, merge_request_iid=99999999 ) assert invalid_mr_result["status"] == "error", f"无效合并请求ID应该返回error状态,实际: {invalid_mr_result['status']}" assert "message" in invalid_mr_result, "错误结果应该包含错误信息" print(f"✅ 无效合并请求ID错误处理验证成功: {invalid_mr_result['message']}") # 6. 数据完整性验证 print("\n=== 数据完整性验证 ===") # 验证diff_refs结构 if len(diffs) > 0: first_diff = diffs[0] diff_refs = first_diff["diff_refs"] # diff_refs应该包含base_sha和head_sha等信息 assert hasattr(diff_refs, '__dict__') or isinstance(diff_refs, dict), "diff_refs应该是一个对象或字典" print(f"✅ diff_refs结构验证成功") # 验证diff内容不为空(对于新增文件) if first_diff["new_path"] == f"diff-test-{branch_name}.txt": diff_content = first_diff["diff"] assert diff_content is not None and len(diff_content) > 0, "diff内容不应为空" assert "get-diff-test" in diff_content, "diff内容应包含文件变更" print(f"✅ diff内容验证成功") print(f"\n✅✅✅ 所有获取合并请求差异测试通过!") print(f"测试总结:") print(f" - 项目ID: {project_id}") print(f" - 合并请求ID: {merge_request_iid}") print(f" - 文件变更数: {len(diffs)}") print(f" - 测试分支: {branch_name}") return project_id, project_name, project_url def test_create_branches(): """测试分支创建功能""" project_id, project_name, project_url = test_search_repositories() # 准备测试数据(分支列表) test_branches = [ {"branch_name": "feature", "ref_branch": "master"}, {"branch_name": "dev", "ref_branch": "master"} ] # 批量执行操作并验证结果 for branch_data in test_branches: result = create_branches( project_id=project_id, branch_name=branch_data["branch_name"], ref_branch=branch_data["ref_branch"] ) # 验证create_branches的输出字段 assert "status" in result, "创建分支结果缺少status字段" assert result["status"] in ["success"], f"分支创建失败,状态: {result['status']}" assert "message" in result, "创建分支结果缺少message字段" assert "branch_name" in result, "创建分支结果缺少branch_name字段" assert "commit" in result, "创建分支结果缺少commit字段" # 验证分支名称正确性 assert result["branch_name"] == branch_data["branch_name"], f"分支名称不匹配,预期: {branch_data['branch_name']}, 实际: {result['branch_name']}" # 验证重复创建处理 duplicate_result = create_branches( project_id=project_id, branch_name=branch_data["branch_name"], ref_branch=branch_data["ref_branch"] ) assert duplicate_result["status"] == "success", f"重复创建分支验证失败,预期: success, 实际: {duplicate_result['status']}" # 在新分支上创建文件 file_result = create_or_update_file( project_id=project_id, file_path=f"branch-test-{branch_data['branch_name']}.txt", content=f"这是在分支 {branch_data['branch_name']} 上创建的测试文件", commit_message=f"在分支 {branch_data['branch_name']} 上添加测试文件", branch=branch_data["branch_name"] ) assert file_result["status"] == "success", f"分支文件操作验证失败,状态: {file_result['status']}" # 验证文件内容 file_content = get_file_contents(project_id, f"branch-test-{branch_data['branch_name']}.txt", ref=branch_data["branch_name"]) assert file_content["content"] == f"这是在分支 {branch_data['branch_name']} 上创建的测试文件", f"分支文件内容验证失败,预期: 这是在分支 {branch_data['branch_name']} 上创建的测试文件,实际: {file_content['content']}" print("✅ 批量创建/检查完成") print("✅ 重复创建分支验证成功") print("✅ 分支文件操作验证成功") return project_id, project_name, project_url def test_delete_branches(): project_id, project_name, project_url = test_search_repositories() # 准备测试数据:先创建一些分支用于删除测试 delete_branches_info = ["feature", "nonexistent-branch", "dev003"] print(f"\n=== 开始删除分支测试 ===") # 测试删除存在的分支 deleted_branches = [] for branch_name in delete_branches_info: result = delete_branches( project_id=project_id, branch_name=branch_name ) # 验证delete_branches的输出字段 assert "status" in result, "删除分支结果缺少status字段" assert result["status"] in ["success", "error"], f"删除分支状态异常: {result.get('status')}" assert "message" in result, "删除分支结果缺少message字段" if result["status"] == "success": deleted_branches.append(branch_name) print(f"✅ 成功删除分支: {branch_name} - {result['message']}") else: print(f"⚠️ 删除分支失败: {branch_name} - {result['message']}") print(f"✅ 批量删除 {len(deleted_branches)} 个分支完成") # 测试删除不存在的分支 print("\n=== 测试删除不存在的分支 ===") non_existent_branch = "non-existent-branch-12345" not_found_result = delete_branches( project_id=project_id, branch_name=non_existent_branch ) # 验证不存在分支的处理 assert not_found_result["status"] == "success", f"删除不存在分支应该返回success状态,实际: {not_found_result['status']}" assert "message" in not_found_result, "错误结果应该包含消息" assert non_existent_branch in not_found_result["message"], "错误消息应该包含分支名称" print(f"✅ 删除不存在分支验证成功: {not_found_result['message']}") # 测试删除默认分支(应该失败) print("\n=== 测试删除默认分支 ===") # 获取项目的默认分支 from src.gitlab_mcp_server import get_gitlab_client gl = get_gitlab_client() project = gl.projects.get(project_id) default_branch = project.default_branch default_branch_result = delete_branches( project_id=project_id, branch_name=default_branch ) # 验证默认分支删除保护 assert default_branch_result["status"] == "error", f"删除默认分支应该返回error状态,实际: {default_branch_result['status']}" assert "message" in default_branch_result, "错误结果应该包含错误信息" assert "默认分支" in default_branch_result["message"] or "default branch" in default_branch_result["message"], "错误消息应该提到默认分支" print(f"✅ 默认分支保护验证成功: {default_branch_result['message']}") # 测试重复删除同一个分支(应该返回not_found) print("\n=== 测试重复删除分支 ===") if deleted_branches: duplicate_delete_result = delete_branches( project_id=project_id, branch_name=deleted_branches[0] ) # 验证重复删除的结果 assert duplicate_delete_result["status"] == "success", f"重复删除分支应该返回success状态,实际: {duplicate_delete_result['status']}" print(f"✅ 重复删除分支验证成功: {duplicate_delete_result['message']}") print(f"\n✅✅✅ 所有删除分支测试通过!") return project_id, project_name, project_url def test_create_tags(): """测试标签创建功能""" project_id, project_name, project_url = test_search_repositories() # 准备测试数据(标签列表) test_tags = [ {"tag_name": "v1.0.0", "ref": "master", "message": "第一个版本发布"}, {"tag_name": "v1.1.0", "ref": "master", "message": "功能更新版本"} ] print(f"\n=== 开始创建标签测试 ===") # 批量执行操作并验证结果 for tag_data in test_tags: result = create_tags( project_id=project_id, tag_name=tag_data["tag_name"], ref_branch=tag_data["ref"], message=tag_data["message"] ) # 验证create_tags的输出字段 assert "status" in result, "创建标签结果缺少status字段" assert result["status"] == "success", f"标签创建失败,状态: {result['status']}" assert "message" in result, "创建标签结果缺少message字段" assert "tag_name" in result, "创建标签结果缺少tag_name字段" assert "commit" in result, "创建标签结果缺少commit字段" # 验证标签名称正确性 assert result["tag_name"] == tag_data["tag_name"], f"标签名称不匹配,预期: {tag_data['tag_name']}, 实际: {result['tag_name']}" print(f"✅ 创建标签: {result['tag_name']} - 状态: {result['status']}") print(f" 消息: {result['message']}") # 验证重复创建处理 duplicate_result = create_tags( project_id=project_id, tag_name=tag_data["tag_name"], ref_branch=tag_data["ref"], message=tag_data["message"] ) assert duplicate_result["status"] == "success", f"重复创建标签验证失败,预期: success, 实际: {duplicate_result['status']}" # 测试创建无消息的标签 simple_tag_result = create_tags( project_id=project_id, tag_name="v0.9.0", ref_branch="master" ) assert simple_tag_result["status"] == "success", f"创建简单标签失败,状态: {simple_tag_result['status']}" print(f"✅ 创建简单标签: {simple_tag_result['tag_name']} - 状态: {simple_tag_result['status']}") # 测试错误处理 - 无效项目ID print("\n=== 测试错误处理 ===") error_result = create_tags( project_id="99999999", tag_name="v2.0.0", ref_branch="master", message="测试标签" ) assert error_result["status"] == "error", f"无效项目ID应该返回error状态,实际: {error_result['status']}" assert "message" in error_result, "错误结果应该包含错误信息" print(f"✅ 错误处理验证成功: {error_result['message']}") print("✅ 批量创建/检查标签完成") print("✅ 重复创建标签验证成功") print("✅ 简单标签创建验证成功") print("✅✅✅ 所有创建标签测试通过!") return project_id, project_name, project_url def test_delete_tags(): """测试标签删除功能""" project_id, project_name, project_url = test_search_repositories() # 准备测试数据:先创建一些标签用于删除测试 test_tags_for_deletion = ["v1.0.0", "v1.1.0", "v0.9.0"] print(f"\n=== 开始删除标签测试 ===") # 测试删除存在的标签 deleted_tags = [] for tag_name in test_tags_for_deletion: result = delete_tags( project_id=project_id, tag_name=tag_name ) # 验证delete_tags的输出字段 assert "status" in result, "删除标签结果缺少status字段" assert result["status"] in ["success", "error"], f"标签 {tag_name} 删除结果状态异常: {result.get('status')}" assert "message" in result, "删除标签结果缺少message字段" if result["status"] == "success": deleted_tags.append(tag_name) print(f"✅ 成功删除标签: {tag_name} - {result['message']}") else: print(f"ℹ️ 删除标签失败或不存在: {tag_name} - {result['message']}") print(f"✅ 批量删除操作完成,实际删除 {len(deleted_tags)} 个标签") # 测试删除不存在的标签 print("\n=== 测试删除不存在的标签 ===") non_existent_tag = "non-existent-tag-12345" not_found_result = delete_tags( project_id=project_id, tag_name=non_existent_tag ) # 验证不存在标签的处理 assert not_found_result["status"] == "success", f"删除不存在标签应该返回success状态,实际: {not_found_result['status']}" assert "message" in not_found_result, "错误结果应该包含消息" assert non_existent_tag in not_found_result["message"], "错误消息应该包含标签名称" print(f"✅ 删除不存在标签验证成功: {not_found_result['message']}") # 测试重复删除同一个标签(应该返回not_found) print("\n=== 测试重复删除标签 ===") if deleted_tags: duplicate_delete_result = delete_tags( project_id=project_id, tag_name=deleted_tags[0] ) # 验证重复删除的结果 assert duplicate_delete_result["status"] == "success", f"重复删除标签应该返回success状态,实际: {duplicate_delete_result['status']}" print(f"✅ 重复删除标签验证成功: {duplicate_delete_result['message']}") # 测试错误处理 - 无效项目ID print("\n=== 测试错误处理 ===") error_result = delete_tags( project_id="99999999", tag_name="v2.0.0" ) assert error_result["status"] == "error", f"无效项目ID应该返回error状态,实际: {error_result['status']}" assert "message" in error_result, "错误结果应该包含错误信息" print(f"✅ 错误处理验证成功: {error_result['message']}") print(f"\n✅✅✅ 所有删除标签测试通过!") return project_id, project_name, project_url def run_all_tests(): """运行所有测试""" try: # 检查环境变量 if not os.getenv("GITLAB_TOKEN"): print("错误: 缺少GITLAB_TOKEN环境变量,无法执行测试") print("请设置环境变量后重试: export GITLAB_TOKEN=your_token") return False # project_id, project_name, project_url = test_create_repository() # project_id, project_name, project_url = test_fork_repository() project_id, project_name, project_url = test_delete_repository() # project_id, project_name, project_url = test_search_repositories() # project_id, project_name, project_url = test_create_or_update_file() # project_id, project_name, project_url = test_push_files() # project_id, project_name, project_url = test_get_file_contents() # project_id, project_name, project_url = test_create_issue() # project_id, project_name, project_url = test_get_issues() # project_id, project_name, project_url = test_create_merge_request() # project_id, project_name, project_url = test_get_merge_request_diff() # project_id, project_name, project_url = test_create_branches() # project_id, project_name, project_url = test_delete_branches() # project_id, project_name, project_url = test_create_tags() # project_id, project_name, project_url = test_delete_tags() print("\n✅✅✅ 所有测试通过!") print(f"注意:测试项目 {project_name} (ID: {project_id}) 请手动删除") print(f"项目URL: {project_url}") return True except Exception as e: print(f"\n❌ 测试失败: {e}") traceback.print_exc() return False if __name__ == "__main__": print("开始GitLab MCP Server端到端测试...") print("警告:此测试会在您的GitLab账号中创建真实的项目和资源!") run_all_tests()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/lucky-dersan/gitlab-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server