Skip to main content
Glama

国金QMT-MCP

main.py5.2 kB
#!/usr/bin/env python3 """ QuantMCP Main Entry - 模块化架构主入口 提供智能策略生成、回测分析、股票筛选等功能 """ import logging import traceback import asyncio from datetime import datetime # 使用FastMCP 2.0 from fastmcp import FastMCP # 导入模块化组件 from src.config import config from src.utils.xtquant_client import xt_client from src.tools import TradingTool, QMTStrategyTool # 配置日志 import os # 确保logs目录存在 os.makedirs('logs', exist_ok=True) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/quantmcp.log', encoding='utf-8'), logging.StreamHandler() ] ) logger = logging.getLogger("quantmcp") def init_system(): """初始化系统""" logger.info("=" * 50) logger.info("QuantMCP 模块化架构 v2.0 启动中...") logger.info("=" * 50) # 初始化XTQuant连接 logger.info("正在初始化XTQuant连接...") try: if xt_client.connect(): logger.info("[OK] XTQuant连接成功") else: logger.warning("[WARNING] XTQuant连接失败,将在离线模式下运行") except Exception as e: logger.error(f"[ERROR] XTQuant初始化失败: {e}") logger.info("[OK] 系统初始化完成") # 创建FastMCP实例 mcp = FastMCP("QuantMCP量化交易助手") # 初始化工具实例 trading_tool = TradingTool() qmt_tool = QMTStrategyTool() @mcp.tool() def place_order(symbol: str, quantity: int, price: float, direction: str = "BUY") -> str: """简化下单工具 用户只需要传入股票代码、数量和价格即可下单。 Args: symbol: 股票代码,如 000001.SZ、600000.SH quantity: 买入股数(必须是100的整数倍) price: 下单价格 direction: 交易方向,默认BUY买入,也可以是SELL卖出 Returns: 下单结果 """ try: logger.info(f"MCP调用: place_order({symbol}, {quantity}, {price}, {direction})") result = trading_tool.place_order( symbol=symbol, quantity=quantity, price=price, direction=direction ) return result except Exception as e: logger.error(f"place_order执行失败: {e}") return f"[ERROR] 下单失败: {str(e)}" @mcp.tool() def cancel_order(order_id: str) -> str: """撤单工具 Args: order_id: 订单ID Returns: 撤单结果 """ try: logger.info(f"MCP调用: cancel_order({order_id})") result = trading_tool.cancel_order(order_id=order_id) return result except Exception as e: logger.error(f"cancel_order执行失败: {e}") return f"[ERROR] 撤单失败: {str(e)}" @mcp.tool() def save_qmt_strategy(strategy_name: str, code: str) -> str: """保存自定义策略代码到 QMT 本地策略目录""" try: logger.info(f"MCP调用: save_qmt_strategy({strategy_name})") return qmt_tool.save_strategy(strategy_name, code) except Exception as e: logger.error(f"save_qmt_strategy 执行失败: {e}") return f"[ERROR] 保存策略失败: {str(e)}" @mcp.tool() def generate_ma_strategy(symbol: str = "000001.SZ", short_period: int = 5, long_period: int = 20, strategy_name: str | None = None) -> str: """生成并保存双均线策略示例""" try: logger.info("MCP调用: generate_ma_strategy") return qmt_tool.generate_ma_strategy(symbol, short_period, long_period, strategy_name) except Exception as e: logger.error(f"generate_ma_strategy 执行失败: {e}") return f"[ERROR] 生成策略失败: {str(e)}" def main(): """主函数""" try: # 初始化系统 init_system() # 启动信息 logger.info("[INFO] QuantMCP服务器启动信息:") logger.info(f" * 服务地址: http://{config.server.host}:{config.server.port}") logger.info(f" * 传输方式: {config.server.transport.upper()}") logger.info(f" * XTQuant状态: {'已连接' if xt_client.is_connected() else '未连接'}") logger.info(" * 架构版本: 模块化架构 v2.0") # 启动SSE服务器 logger.info(f"[START] QuantMCP Server starting on http://{config.server.host}:{config.server.port}") # 使用FastMCP 2.0的SSE传输方式运行 mcp.run( transport=config.server.transport, host=config.server.host, port=config.server.port ) except KeyboardInterrupt: logger.info("[STOP] 用户中断,服务器正在关闭...") except Exception as e: logger.error(f"[ERROR] 服务器错误: {e}") logger.error(traceback.format_exc()) raise finally: # 清理资源 try: xt_client.disconnect() logger.info("[OK] 资源清理完成") except: pass if __name__ == "__main__": main()

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/guangxiangdebizi/QMT-MCP'

If you have feedback or need assistance with the MCP directory API, please join our Discord server