Skip to main content
Glama

Schwab Model Context Protocol Server

by jkoelker
server.py2.32 kB
from __future__ import annotations import logging import sys from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from typing import AsyncContextManager, Callable, Optional import mcp.types as types from mcp.server.fastmcp import FastMCP from schwab.client import AsyncClient from schwab_mcp.tools import register_tools from schwab_mcp.context import SchwabServerContext logger = logging.getLogger(__name__) def _client_lifespan( client: AsyncClient, ) -> Callable[[FastMCP], AsyncContextManager[SchwabServerContext]]: """Create a FastMCP lifespan context that exposes the Schwab async client.""" @asynccontextmanager async def lifespan(_: FastMCP) -> AsyncGenerator[SchwabServerContext, None]: context = SchwabServerContext(client=client) try: yield context finally: try: await client.close_async_session() except Exception: logger.exception( "Failed to close Schwab async client session during shutdown." ) return lifespan class SchwabMCPServer: """Schwab Model Context Protocol server backed by FastMCP.""" def __init__( self, name: str, client: AsyncClient, jesus_take_the_wheel: bool = False, ) -> None: self._server = FastMCP(name=name, lifespan=_client_lifespan(client)) register_tools(self._server, client, allow_write=jesus_take_the_wheel) async def run(self) -> None: """Run the server using FastMCP's stdio transport.""" await self._server.run_stdio_async() def send_error_response( error_message: str, code: int = 401, details: Optional[dict] = None ) -> None: """ Send a proper MCP error response to stdout and exit. This function can be used before the server is started to return error responses in the proper MCP format. """ if details is None: details = {} error_data = types.ErrorData(code=code, message=error_message, data=details) response = types.JSONRPCError( jsonrpc="2.0", id="pre-initialization", error=error_data, ) json_response = response.model_dump_json() sys.stdout.write(f"{json_response}\n") sys.stdout.flush() sys.exit(1)

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/jkoelker/schwab-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server