Skip to main content
Glama
main.py7.43 kB
import os from contextlib import asynccontextmanager from dotenv import load_dotenv from fastapi import FastAPI, Request from fastapi.responses import JSONResponse # 서버용 SSE 트랜스포트 임포트 from fastmcp.low_level.sse_server_transport import SseServerTransport # 올바른 fastmcp 임포트 from fastmcp.server import FastMCP from app.mongodb.client import connect_to_mongodb, close_mongodb from app.tools.registry import ToolRegistry # 환경 변수 로드 load_dotenv() # 환경 변수에서 설정 가져오기 database_url = os.getenv("MONGODB_URL", "mongodb://root:example@localhost:27017/admin") transport_type = os.getenv("MCP_TRANSPORT", "http").lower() # 수명 주기 관리자 @asynccontextmanager async def lifespan(app: FastAPI): """앱 수명 주기 관리""" # 시작 시 실행 await connect_to_mongodb(database_url) print(f"MongoDB MCP server running with {transport_type} transport") yield # 종료 시 실행 await close_mongodb() print("MongoDB MCP server shutdown complete") # FastAPI 앱 생성 app = FastAPI( title="MongoDB MCP Server", description="MongoDB MCP Server for AI Agents", version="0.1.0", lifespan=lifespan ) # FastMCP 인스턴스 생성 mcp = FastMCP( name="mongodb-mcp-bridge", version="0.1.0" ) # 도구 레지스트리 초기화 tool_registry = ToolRegistry() @app.get("/health") async def health_check(): """상태 체크 엔드포인트""" return { "status": "ok", "version": "0.1.0", "transport": transport_type, "database": database_url.split("@")[-1].split("/")[0] } # 도구 등록 for tool in tool_registry.get_all_tools(): try: # 시그니처에 맞게 수정: (fn, name=None, description=None, tags=None, annotations=None) if hasattr(mcp, 'add_tool'): # 첫 번째 인자로 함수를 전달하고, 나머지는 키워드 인자로 전달 if callable(tool.execute): mcp.add_tool( tool.execute, # fn 인자 - 첫 번째 위치 name=tool.name, description=tool.description ) print(f"도구 등록 성공: {tool.name}") else: print(f"도구 실행 함수가 호출 가능한 객체가 아닙니다: {tool.name}, 타입: {type(tool.execute)}") elif hasattr(mcp, 'register_tool'): if callable(tool.execute): mcp.register_tool( tool.execute, name=tool.name, description=tool.description ) print(f"도구 등록 성공: {tool.name}") else: print(f"도구 실행 함수가 호출 가능한 객체가 아닙니다: {tool.name}") else: print(f"도구 등록 메서드를 찾을 수 없습니다: {tool.name}") except Exception as e: print(f"도구 등록 중 오류: {e}") if hasattr(mcp, 'add_tool'): import inspect print(f"add_tool 메서드 시그니처: {inspect.signature(mcp.add_tool)}") # FastMCP 앱 연결 - 트랜스포트 타입에 따라 설정 try: print("FastMCP 앱 연결 시도") if transport_type == "sse": print("SSE 트랜스포트 모드로 설정됨") else: print("HTTP 트랜스포트 모드로 설정됨") # from_fastapi 메서드 사용 시도 (권장 방식) if hasattr(mcp, 'from_fastapi') and callable(mcp.from_fastapi): # FastMCP 앱을 FastAPI 앱에 연결 mcp.from_fastapi(app, prefix="/mcp") print("성공: from_fastapi 메서드로 FastMCP 앱 통합") # 대체 방식: 필요한 앱 마운트 else: # 앱 마운트 상태 추적 apps_mounted = 0 # HTTP 앱 마운트 if hasattr(mcp, 'http_app'): try: app.mount("/mcp", mcp.http_app) print("성공: HTTP 앱을 /mcp 경로에 마운트") apps_mounted += 1 except Exception as e: print(f"HTTP 앱 마운트 실패: {e}") # SSE 앱 마운트 if hasattr(mcp, 'sse_app'): try: app.mount("/mcp/sse", mcp.sse_app) print("성공: SSE 앱을 /mcp/sse 경로에 마운트") apps_mounted += 1 except Exception as e: print(f"SSE 앱 마운트 실패: {e}") # 스트리밍 HTTP 앱 마운트 if hasattr(mcp, 'streamable_http_app'): try: app.mount("/mcp/stream", mcp.streamable_http_app) print("성공: 스트리밍 HTTP 앱을 /mcp/stream 경로에 마운트") apps_mounted += 1 except Exception as e: print(f"스트리밍 HTTP 앱 마운트 실패: {e}") # 앱 마운트 결과 요약 if apps_mounted > 0: print(f"성공: {apps_mounted}개 FastMCP 앱 통합 완료") else: print("주의: FastMCP 앱을 마운트할 수 없습니다.") print("커스텀 엔드포인트(/sse, /messages)를 통해 기능이 제공됩니다.") except Exception as e: print(f"FastMCP 앱 연결 중 오류: {e}") import traceback traceback.print_exc() # SSE 엔드포인트 @app.get("/sse") async def sse_endpoint(request: Request): """SSE 엔드포인트""" try: # SseServerTransport 생성 transport = SseServerTransport(request) # FastMCP에 트랜스포트 연결 if hasattr(mcp, 'handle_connection'): await mcp.handle_connection(transport) elif hasattr(mcp, 'connect'): await mcp.connect(transport) else: print("트랜스포트 연결 메서드를 찾을 수 없습니다.") return JSONResponse( status_code=500, content={"error": "Transport connection method not found"} ) # 응답 반환 return transport.response except Exception as e: print(f"SSE 연결 처리 중 오류: {e}") import traceback traceback.print_exc() return JSONResponse( status_code=500, content={"error": f"SSE connection error: {str(e)}"} ) # 메시지 엔드포인트 @app.post("/messages") async def messages_endpoint(request: Request): """메시지 처리 엔드포인트""" try: data = await request.json() session_id = request.query_params.get("sessionId") # FastMCP로 메시지 처리 if hasattr(mcp, 'handle_message'): response = await mcp.handle_message(session_id, data) elif hasattr(mcp, 'process_message'): response = await mcp.process_message(session_id, data) else: print("메시지 처리 메서드를 찾을 수 없습니다.") return JSONResponse( status_code=500, content={"error": "Message processing method not found"} ) return JSONResponse(content=response) except Exception as e: print(f"메시지 처리 중 오류: {e}") import traceback traceback.print_exc() return JSONResponse( status_code=500, content={"error": f"Message processing error: {str(e)}"} )

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/TeiNam/mongo-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server