Skip to main content
Glama
test_sum_int_with_agent_streamable_http.py8.66 kB
#!/usr/bin/env python3 """ 通过agent使用MCP的测试脚本 (Streamable HTTP方式) """ import asyncio import sys from pathlib import Path import os from dotenv import load_dotenv import logging import subprocess import time import httpx # 添加src目录到Python路径 sys.path.insert(0, str(Path(__file__).parent.parent / "src" / "mcp_server")) # 尝试加载.env文件 env_path = Path(__file__).parent.parent / ".env" if env_path.exists(): load_dotenv(dotenv_path=env_path) # 设置日志级别以显示更多LangChain内部信息 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) try: from langchain_mcp_adapters.tools import load_mcp_tools from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, AIMessage, ToolMessage from langgraph.prebuilt import create_react_agent from mcp import ClientSession from mcp.client.streamable_http import streamablehttp_client LANGCHAIN_AVAILABLE = True except ImportError as e: LANGCHAIN_AVAILABLE = False print(f"Warning: LangChain dependencies not found. Install with: pip install langchain langchain-openai langchain-mcp-adapters langgraph") print(f"Import error: {e}") async def wait_for_server(url: str, timeout: int = 30) -> bool: """等待服务器启动""" start_time = time.time() while time.time() - start_time < timeout: try: async with httpx.AsyncClient() as client: response = await client.get(url) # Streamable HTTP服务器会返回405 (Method Not Allowed)而不是200 # 因为它只接受POST请求,所以检查状态码是否不是502 if response.status_code != 502: return True except Exception: pass await asyncio.sleep(1) return False if LANGCHAIN_AVAILABLE: async def test_agent_with_mcp_streamable_http(): """测试通过agent使用MCP (Streamable HTTP方式)""" # 从环境变量获取配置 base_url = os.environ.get("LLM_BASE_URL") api_key = os.environ.get("LLM_API_KEY") model = os.environ.get("LLM_MODEL") mcp_port = os.environ.get("MCP_SERVER_PORT", "8000") # 检查环境变量是否设置 if not base_url: raise ValueError("LLM_BASE_URL environment variable is not set") if not api_key: raise ValueError("LLM_API_KEY environment variable is not set") if not model: raise ValueError("LLM_MODEL environment variable is not set") # 启动MCP服务器 (Streamable HTTP方式) print("Starting MCP server with Streamable HTTP transport...") server_process = subprocess.Popen([ sys.executable, "src/mcp_server/sum_int.py", "streamable-http" ], stdout=subprocess.PIPE, stderr=subprocess.PIPE) # 等待服务器启动 server_url = f"http://127.0.0.1:{mcp_port}/mcp" if not await wait_for_server(server_url): server_process.terminate() raise RuntimeError("MCP server failed to start within timeout") print(f"MCP server started at {server_url}") try: # 使用指定的LLM配置 try: model = ChatOpenAI( model=model, base_url=base_url, api_key=api_key, temperature=0 # 降低随机性以获得更一致的结果 ) except Exception as e: print(f"Failed to initialize model: {e}") raise # 使用Streamable HTTP客户端连接到MCP服务器 async with streamablehttp_client(server_url) as (read, write, _): async with ClientSession(read, write) as session: # 初始化连接 await session.initialize() # 加载MCP工具 tools = await load_mcp_tools(session) print(f"Loaded tools: {[tool.name for tool in tools]}") # 创建agent agent = create_react_agent(model, tools) # 测试问题列表 test_questions = [ "What is 15 plus 25?", "Calculate the sum of 123 and 456" ] expected_results = [40, 579] for i, question in enumerate(test_questions): print(f"\n{'='*50}") print(f"Testing question {i+1}: {question}") print(f"{'='*50}") try: # 显示发送给agent的消息 print(f"\n[User Message] Sending to agent:") print(f" Content: {question}") # 调用agent response = await agent.ainvoke({ "messages": [HumanMessage(content=question)] }) # 显示完整的对话历史 print(f"\n[Agent Messages] Full conversation history:") for j, msg in enumerate(response['messages']): if isinstance(msg, HumanMessage): print(f" Message {j+1} [Human]: {msg.content}") elif isinstance(msg, AIMessage): print(f" Message {j+1} [AI]: {msg.content}") if hasattr(msg, 'tool_calls') and msg.tool_calls: print(f" Tool Calls: {msg.tool_calls}") elif isinstance(msg, ToolMessage): print(f" Message {j+1} [Tool]: {msg.content}") # 输出最终响应 agent_response = response['messages'][-1].content print(f"\n[Final Response] Agent final response:") print(f" {agent_response}") # 验证结果是否包含预期值 if str(expected_results[i]) in agent_response: print(f"\n✓ Test passed: Response contains expected result {expected_results[i]}") else: print(f"\n⚠ Warning: Response may not contain expected result {expected_results[i]}") except Exception as e: print(f"Error processing question '{question}': {e}") import traceback traceback.print_exc() print(f"\n{'='*50}") print("Agent with MCP Tests Completed") print(f"{'='*50}") finally: # 终止服务器进程 server_process.terminate() try: server_process.wait(timeout=5) except subprocess.TimeoutExpired: server_process.kill() def main(): """主函数""" if not LANGCHAIN_AVAILABLE: print("Cannot run agent tests without LangChain dependencies.") print("Install dependencies with: pip install langchain langchain-openai langchain-mcp-adapters langgraph") return print("=== Agent with MCP Integration Test (Streamable HTTP) ===") base_url = os.environ.get("LLM_BASE_URL") model = os.environ.get("LLM_MODEL") print(f"Using API base URL: {base_url}") print(f"Using model: {model}") try: asyncio.run(test_agent_with_mcp_streamable_http()) except Exception as e: print(f"Test failed with error: {e}") import traceback traceback.print_exc() if __name__ == "__main__": main() else: def main(): print("Cannot run agent tests without LangChain dependencies.") print("Install dependencies with: pip install langchain langchain-openai langchain-mcp-adapters langgraph") if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/ChaosXu/pymcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server