Skip to main content
Glama

MCP Server

by ningwenjie
test_server.py13.2 kB
""" MCP服务器测试脚本 """ import os import sys import json import requests import numpy as np import time # 测试配置 MCP_SERVER_URL = "http://localhost:8000" TEST_FILE_PATH = "test_file.txt" TEST_VECTOR_DIMENSION = 1536 def test_health_check(): """测试健康检查接口""" print("\n测试健康检查接口...") try: response = requests.get(f"{MCP_SERVER_URL}/health") if response.status_code == 200 and response.json()["status"] == "healthy": print("✅ 健康检查通过") return True else: print(f"❌ 健康检查失败: {response.text}") return False except Exception as e: print(f"❌ 健康检查异常: {str(e)}") return False def test_file_module(): """测试文件访问模块""" print("\n测试文件访问模块...") # 创建测试文件 with open(TEST_FILE_PATH, "w") as f: f.write("这是一个测试文件,用于测试MCP服务器的文件访问模块。") try: # 测试文件上传 print("测试文件上传...") files = {"file": open(TEST_FILE_PATH, "rb")} response = requests.post(f"{MCP_SERVER_URL}/files/upload", files=files) files["file"].close() if response.status_code != 200: print(f"❌ 文件上传失败: {response.text}") return False file_info = response.json() file_id = file_info["id"] print(f"✅ 文件上传成功: ID={file_id}") # 测试文件列表 print("测试文件列表...") response = requests.get(f"{MCP_SERVER_URL}/files/list") if response.status_code != 200: print(f"❌ 获取文件列表失败: {response.text}") return False files_list = response.json() if not any(f["id"] == file_id for f in files_list): print(f"❌ 上传的文件未在列表中找到") return False print(f"✅ 文件列表获取成功,找到上传的文件") # 测试文件下载 print("测试文件下载...") response = requests.get(f"{MCP_SERVER_URL}/files/download/{file_id}") if response.status_code != 200: print(f"❌ 文件下载失败: {response.text}") return False print(f"✅ 文件下载成功") # 测试文件删除 print("测试文件删除...") response = requests.delete(f"{MCP_SERVER_URL}/files/{file_id}") if response.status_code != 200: print(f"❌ 文件删除失败: {response.text}") return False print(f"✅ 文件删除成功") # 验证文件已删除 response = requests.get(f"{MCP_SERVER_URL}/files/list") files_list = response.json() if any(f["id"] == file_id for f in files_list): print(f"❌ 文件删除后仍在列表中") return False print(f"✅ 文件访问模块测试通过") return True except Exception as e: print(f"❌ 文件访问模块测试异常: {str(e)}") return False finally: # 清理测试文件 if os.path.exists(TEST_FILE_PATH): os.remove(TEST_FILE_PATH) def test_database_module(): """测试数据库连接模块""" print("\n测试数据库连接模块...") try: # 测试插入记录 print("测试插入记录...") test_data = { "collection": "test_collection", "data": { "title": "测试文档", "content": "这是一个测试文档,用于测试MCP服务器的数据库连接模块。", "tags": ["测试", "MCP", "数据库"] } } response = requests.post( f"{MCP_SERVER_URL}/database/insert", json=test_data ) if response.status_code != 200: print(f"❌ 插入记录失败: {response.text}") return False record_info = response.json() record_id = record_info["id"] print(f"✅ 插入记录成功: ID={record_id}") # 测试查询记录 print("测试查询记录...") response = requests.get( f"{MCP_SERVER_URL}/database/find/test_collection" ) if response.status_code != 200: print(f"❌ 查询记录失败: {response.text}") return False records = response.json() if not any(r["id"] == record_id for r in records): print(f"❌ 插入的记录未在查询结果中找到") return False print(f"✅ 查询记录成功,找到插入的记录") # 测试更新记录 print("测试更新记录...") update_data = { "title": "更新后的测试文档", "updated": True } response = requests.put( f"{MCP_SERVER_URL}/database/update/test_collection/{record_id}", json=update_data ) if response.status_code != 200: print(f"❌ 更新记录失败: {response.text}") return False updated_record = response.json() if not updated_record["data"].get("updated"): print(f"❌ 记录未正确更新") return False print(f"✅ 更新记录成功") # 测试删除记录 print("测试删除记录...") response = requests.delete( f"{MCP_SERVER_URL}/database/delete/test_collection/{record_id}" ) if response.status_code != 200: print(f"❌ 删除记录失败: {response.text}") return False print(f"✅ 删除记录成功") # 验证记录已删除 response = requests.get( f"{MCP_SERVER_URL}/database/find/test_collection" ) records = response.json() if any(r["id"] == record_id for r in records): print(f"❌ 记录删除后仍在查询结果中") return False print(f"✅ 数据库连接模块测试通过") return True except Exception as e: print(f"❌ 数据库连接模块测试异常: {str(e)}") return False def test_api_module(): """测试API集成模块""" print("\n测试API集成模块...") try: # 测试API请求 print("测试API请求...") test_request = { "url": "https://httpbin.org/get", "method": "GET", "params": { "test": "value", "source": "mcp_server_test" } } response = requests.post( f"{MCP_SERVER_URL}/api/request", json=test_request ) if response.status_code != 200: print(f"❌ API请求失败: {response.text}") return False api_response = response.json() if api_response["status_code"] != 200: print(f"❌ API响应状态码错误: {api_response['status_code']}") return False # 验证API响应内容 content = api_response["content"] if not isinstance(content, dict) or not content.get("args") or content["args"].get("source") != "mcp_server_test": print(f"❌ API响应内容错误: {content}") return False print(f"✅ API请求成功,响应内容正确") print(f"✅ API集成模块测试通过") return True except Exception as e: print(f"❌ API集成模块测试异常: {str(e)}") return False def test_vector_module(): """测试向量数据库访问模块""" print("\n测试向量数据库访问模块...") try: # 生成测试向量 vector1 = np.random.rand(TEST_VECTOR_DIMENSION).tolist() vector2 = np.random.rand(TEST_VECTOR_DIMENSION).tolist() query_vector = np.random.rand(TEST_VECTOR_DIMENSION).tolist() # 测试插入向量 print("测试插入向量...") test_vector1 = { "collection": "test_vectors", "vector": vector1, "metadata": { "description": "测试向量1", "source": "测试脚本" } } response = requests.post( f"{MCP_SERVER_URL}/vector/insert", json=test_vector1 ) if response.status_code != 200: print(f"❌ 插入向量1失败: {response.text}") return False vector_info1 = response.json() vector_id1 = vector_info1["id"] print(f"✅ 插入向量1成功: ID={vector_id1}") # 插入第二个向量 test_vector2 = { "collection": "test_vectors", "vector": vector2, "metadata": { "description": "测试向量2", "source": "测试脚本" } } response = requests.post( f"{MCP_SERVER_URL}/vector/insert", json=test_vector2 ) if response.status_code != 200: print(f"❌ 插入向量2失败: {response.text}") return False vector_info2 = response.json() vector_id2 = vector_info2["id"] print(f"✅ 插入向量2成功: ID={vector_id2}") # 测试向量搜索 print("测试向量搜索...") test_query = { "collection": "test_vectors", "vector": query_vector, "top_k": 2 } response = requests.post( f"{MCP_SERVER_URL}/vector/search", json=test_query ) if response.status_code != 200: print(f"❌ 向量搜索失败: {response.text}") return False search_results = response.json() if len(search_results) != 2: print(f"❌ 向量搜索结果数量错误: {len(search_results)}") return False print(f"✅ 向量搜索成功,返回了{len(search_results)}个结果") # 测试删除向量 print("测试删除向量...") response = requests.delete( f"{MCP_SERVER_URL}/vector/test_vectors/{vector_id1}" ) if response.status_code != 200: print(f"❌ 删除向量失败: {response.text}") return False print(f"✅ 删除向量成功") # 验证向量已删除 test_query["top_k"] = 1 response = requests.post( f"{MCP_SERVER_URL}/vector/search", json=test_query ) search_results = response.json() if len(search_results) != 1 or search_results[0]["id"] == vector_id1: print(f"❌ 向量删除后仍在搜索结果中") return False print(f"✅ 向量数据库访问模块测试通过") return True except Exception as e: print(f"❌ 向量数据库访问模块测试异常: {str(e)}") return False def main(): """主函数""" print("===== MCP服务器功能测试 =====") # 等待服务器启动 print("等待MCP服务器启动...") max_retries = 5 retry_count = 0 while retry_count < max_retries: try: response = requests.get(f"{MCP_SERVER_URL}/health") if response.status_code == 200: print("MCP服务器已启动,开始测试...") break except: pass retry_count += 1 print(f"尝试连接MCP服务器 ({retry_count}/{max_retries})...") time.sleep(2) if retry_count >= max_retries: print("❌ 无法连接到MCP服务器,请确保服务器已启动") return False # 运行测试 tests = [ ("健康检查", test_health_check), ("文件访问模块", test_file_module), ("数据库连接模块", test_database_module), ("API集成模块", test_api_module), ("向量数据库访问模块", test_vector_module) ] results = {} all_passed = True for name, test_func in tests: print(f"\n===== 测试 {name} =====") result = test_func() results[name] = result if not result: all_passed = False # 打印测试结果摘要 print("\n===== 测试结果摘要 =====") for name, result in results.items(): status = "✅ 通过" if result else "❌ 失败" print(f"{name}: {status}") if all_passed: print("\n✅ 所有测试通过!MCP服务器功能正常。") else: print("\n❌ 部分测试失败,请检查详细输出。") return all_passed if __name__ == "__main__": success = main() sys.exit(0 if success else 1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ningwenjie/mcp_server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server