Skip to main content
Glama

Stock Analysis MCP Server

test_sse_fixed.py2.3 kB
#!/usr/bin/env python3 """ 测试修复版SSE服务器 """ import requests import json import time def test_sse_endpoints(): """测试SSE端点""" base_url = "http://localhost:8001" print("🚀 测试修复版SSE服务器...") print("=" * 60) # 1. 测试服务器状态 try: response = requests.get(f"{base_url}/") print("✅ 服务器状态检查 - 成功") print(json.dumps(response.json(), ensure_ascii=False, indent=2)) except Exception as e: print(f"❌ 服务器状态检查 - 失败: {e}") return print("\n" + "=" * 60) # 2. 测试API接口 try: response = requests.get(f"{base_url}/api/realtime?symbol=000001") print("✅ 实时行情API - 成功") data = response.json() print(f"股票: {data.get('股票名称', 'N/A')} ({data.get('股票代码', 'N/A')})") print(f"最新价: {data.get('最新价', 'N/A')}") print(f"涨跌幅: {data.get('涨跌幅', 'N/A')}%") except Exception as e: print(f"❌ 实时行情API - 失败: {e}") print("\n" + "=" * 60) # 3. 测试SSE流(简单测试) try: print("🔄 测试SSE实时数据流...") response = requests.get(f"{base_url}/stream/realtime?symbol=000001", stream=True, timeout=10) if response.headers.get('content-type') == 'text/event-stream': print("✅ SSE内容类型正确: text/event-stream") # 读取前几行数据 lines_read = 0 for line in response.iter_lines(decode_unicode=True): if line: print(f"SSE数据: {line}") lines_read += 1 if lines_read >= 5: # 只读取前5行 break print("✅ SSE数据流测试完成") else: print(f"❌ SSE内容类型错误: {response.headers.get('content-type')}") except requests.exceptions.Timeout: print("✅ SSE连接超时(正常,说明流正在工作)") except Exception as e: print(f"❌ SSE流测试 - 失败: {e}") print("\n" + "=" * 60) print("🎉 测试完成!") if __name__ == "__main__": test_sse_endpoints()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jwangkun/stock_mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server