Skip to main content
Glama

Stock MCP Server

by huweihua123
app.py3.47 kB
""" SSE + HTTP POST 双向通信股票数据服务器 主应用入口文件 """ import logging import sys from contextlib import asynccontextmanager from typing import Dict, Any from pathlib import Path from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles # 处理相对导入问题 if __name__ == "__main__": # 如果直接运行此文件,添加项目根目录到 Python 路径 project_root = Path(__file__).parent.parent.parent sys.path.insert(0, str(project_root)) from src.server.routes.sse_routes import router as sse_router from src.server.routes.api_routes import router as api_router from src.server.services.sse_service import SSEManager from src.server.services.message_service import MessageService from src.server.utils.event_manager import EventManager from src.config.settings import get_settings else: # 正常的相对导入 from .routes.sse_routes import router as sse_router from .routes.api_routes import router as api_router from .services.sse_service import SSEManager from .services.message_service import MessageService from .utils.event_manager import EventManager from ..config.settings import get_settings # 配置日志 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", stream=sys.stdout, ) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): """应用生命周期管理""" logger.info("🚀 启动 SSE + HTTP POST 双向通信服务器") # 启动时的初始化 settings = get_settings() logger.info(f"📋 服务配置: {settings.app_name}") yield # 关闭时的清理 logger.info("🛑 关闭 SSE + HTTP POST 双向通信服务器") def create_app() -> FastAPI: """创建 FastAPI 应用实例""" settings = get_settings() app = FastAPI( title=settings.app_name, description="SSE + HTTP POST 双向通信股票数据服务", version="1.0.0", lifespan=lifespan, ) # 配置 CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # 注册路由 app.include_router(sse_router, prefix="/sse", tags=["SSE"]) app.include_router(api_router, prefix="/api", tags=["API"]) # 静态文件 app.mount("/static", StaticFiles(directory="src/client/static"), name="static") @app.get("/") async def root(request: Request): """API测试页面 - 主页""" from fastapi.responses import FileResponse return FileResponse("src/client/static/api-test.html") @app.get("/health") async def health_check(): """健康检查""" try: sse_manager_instance = SSEManager() connections_count = len(sse_manager_instance.get_active_connections()) except Exception: connections_count = 0 return { "status": "healthy", "service": "SSE + HTTP POST 双向通信服务", "connections": connections_count, } return app # 创建应用实例 app = create_app() if __name__ == "__main__": import uvicorn logger.info("🚀 直接启动 FastAPI 服务器") uvicorn.run(app, host="127.0.0.1", port=8000, log_level="info")

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/huweihua123/stock-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server