Skip to main content
Glama
main.py26.4 kB
import json import logging import secrets import sys import time from datetime import datetime from pathlib import Path import structlog import uvicorn from fastapi import FastAPI, HTTPException, Request, Response from fastapi.responses import RedirectResponse from fastmcp import FastMCP from fastmcp.server.auth.auth import OAuthProvider from fulcra_api.core import FulcraAPI from mcp.server.auth.middleware.auth_context import get_access_token from mcp.server.auth.provider import ( AccessToken, AuthorizationCode, AuthorizationParams, RefreshToken, construct_redirect_uri, ) from mcp.server.auth.settings import ClientRegistrationOptions, RevocationOptions from mcp.server.session import ServerSession from mcp.shared.auth import OAuthClientInformationFull, OAuthToken from pydantic import AnyHttpUrl from pydantic_settings import BaseSettings OIDC_SCOPES = ["openid", "profile", "name", "email"] class Settings(BaseSettings): state_path: Path = Path("state/").resolve() oidc_server_url: str = "http://localhost:4499" fulcra_environment: str = "stdio" port: int = 4499 oidc_client_id: str | None = None fulcra_oidc_domain: str | None = None fulcra_api: str | None = None settings = Settings() logger = structlog.getLogger(__name__) if settings.fulcra_environment == "localdev": logging.basicConfig(format="%(message)s", stream=sys.stderr, level=logging.DEBUG) class FulcraOAuthProvider(OAuthProvider): def __init__( self, issuer_url: AnyHttpUrl | str, service_documentation_url: AnyHttpUrl | str | None = None, client_registration_options: ClientRegistrationOptions | None = None, revocation_options: RevocationOptions | None = None, required_scopes: list[str] | None = None, ): super().__init__( base_url=settings.oidc_server_url, issuer_url=issuer_url, service_documentation_url=service_documentation_url, client_registration_options=client_registration_options, revocation_options=revocation_options, required_scopes=required_scopes, ) self.auth_codes: dict[str, AuthorizationCode] = {} self.tokens: dict[str, AccessToken] = {} self.state_mapping: dict[str, dict[str, str]] = {} self.token_mapping: dict[str, str] = {} async def get_client(self, client_id: str) -> OAuthClientInformationFull | None: """Get OAuth client information.""" client_filepath = (settings.state_path / f"{client_id}.json").resolve() if not client_filepath.is_relative_to(settings.state_path): return None try: with client_filepath.open(mode="r") as c: return OAuthClientInformationFull.model_validate_json(c.read()) except FileNotFoundError: return None except Exception as exc: logger.error("Caught exception while loading client info", exc_info=exc) return None async def register_client(self, client_info: OAuthClientInformationFull): """Register a new OAuth client.""" client_filepath = ( settings.state_path / f"{client_info.client_id}.json" ).resolve() if not client_filepath.is_relative_to(settings.state_path): return None try: with client_filepath.open(mode="w") as c: c.write(client_info.model_dump_json()) except Exception as exc: logger.error("Caught exception while writing client info", exc_info=exc) async def authorize( self, client: OAuthClientInformationFull, params: AuthorizationParams ) -> str: state = params.state or secrets.token_hex(16) self.state_mapping[state] = { "redirect_uri": str(params.redirect_uri), "code_challenge": params.code_challenge, "redirect_uri_provided_explicitly": str( params.redirect_uri_provided_explicitly ), "client_id": client.client_id, } fulcra = FulcraAPI( oidc_client_id=settings.oidc_client_id, oidc_domain=settings.fulcra_oidc_domain, oidc_audience=settings.fulcra_api, ) auth_url = fulcra.get_authorization_code_url( redirect_uri=f"{settings.oidc_server_url}/callback", state=state, ) return auth_url async def handle_callback(self, code: str, state: str) -> str: state_data = self.state_mapping.get(state) if not state_data: raise HTTPException(400, "Invalid state parameter") redirect_uri = state_data["redirect_uri"] code_challenge = state_data["code_challenge"] redirect_uri_provided_explicitly = ( state_data["redirect_uri_provided_explicitly"] == "True" ) client_id = state_data["client_id"] fulcra = FulcraAPI( oidc_client_id=settings.oidc_client_id, ) try: fulcra.authorize_with_authorization_code( code=code, redirect_uri=f"{settings.oidc_server_url}/callback", ) access_token = fulcra.get_cached_access_token() new_code = f"mcp_{secrets.token_hex(16)}" # Create MCP authorization code auth_code = AuthorizationCode( code=new_code, client_id=client_id, redirect_uri=AnyHttpUrl(redirect_uri), redirect_uri_provided_explicitly=redirect_uri_provided_explicitly, expires_at=time.time() + 300, scopes=OIDC_SCOPES, code_challenge=code_challenge, ) self.auth_codes[new_code] = auth_code self.tokens[access_token] = AccessToken( token=access_token, client_id=client_id, scopes=OIDC_SCOPES, expires_at=None, ) except Exception as e: logger.error("oauth2 code exchange failure", exc_info=e) raise HTTPException(400, "failed to exchange code for token") del self.state_mapping[state] return construct_redirect_uri(redirect_uri, code=new_code, state=state) async def load_authorization_code( self, client: OAuthClientInformationFull, authorization_code: str ) -> AuthorizationCode | None: """Load an authorization code.""" return self.auth_codes.get(authorization_code) async def exchange_authorization_code( self, client: OAuthClientInformationFull, authorization_code: AuthorizationCode ) -> OAuthToken: if authorization_code.code not in self.auth_codes: raise ValueError("Invalid authorization code") # Generate MCP access token mcp_token = f"mcp_{secrets.token_hex(32)}" # Store MCP token self.tokens[mcp_token] = AccessToken( token=mcp_token, client_id=client.client_id, scopes=authorization_code.scopes, expires_at=int(time.time()) + 3600, ) # Find GitHub token for this client oidc_token = next( ( token for token, data in self.tokens.items() # see https://github.blog/engineering/platform-security/behind-githubs-new-authentication-token-formats/ # which you get depends on your GH app setup. if data.client_id == client.client_id ), None, ) if oidc_token: self.token_mapping[mcp_token] = oidc_token del self.auth_codes[authorization_code.code] return OAuthToken( access_token=mcp_token, token_type="bearer", expires_in=3600, scope=" ".join(authorization_code.scopes), ) async def load_access_token(self, token: str) -> AccessToken | None: """Load and validate an access token.""" access_token = self.tokens.get(token) if not access_token: return None # Check if expired if access_token.expires_at and access_token.expires_at < time.time(): del self.tokens[token] return None return access_token async def load_refresh_token( self, client: OAuthClientInformationFull, refresh_token: str ) -> RefreshToken | None: """Load a refresh token - not supported.""" return None async def exchange_refresh_token( self, client: OAuthClientInformationFull, refresh_token: RefreshToken, scopes: list[str], ) -> OAuthToken: """Exchange refresh token""" raise NotImplementedError("Not supported") async def revoke_token( self, token: str, token_type_hint: str | None = None ) -> None: """Revoke a token.""" if token in self.tokens: del self.tokens[token] oauth_provider = FulcraOAuthProvider( issuer_url=AnyHttpUrl(settings.oidc_server_url), client_registration_options=ClientRegistrationOptions( enabled=True, valid_scopes=OIDC_SCOPES, default_scopes=OIDC_SCOPES, ), required_scopes=["openid"], ) mcp = FastMCP( name="Fulcra Context Agent", instructions=""" This server provides personal data retrieval tools. Always specify the time zone when using times as parameters. """, auth=oauth_provider, ) stdio_fulcra: FulcraAPI | None = None def get_fulcra_object() -> FulcraAPI: global stdio_fulcra if settings.fulcra_environment == "stdio": if stdio_fulcra is not None: return stdio_fulcra else: stdio_fulcra = FulcraAPI() stdio_fulcra.authorize() return stdio_fulcra mcp_access_token = get_access_token() if not mcp_access_token: raise HTTPException(401, "Not authenticated") fulcra_token = oauth_provider.token_mapping.get(mcp_access_token.token) if fulcra_token is None: raise HTTPException(401, "Not authenticated") fulcra = FulcraAPI() fulcra.set_cached_access_token(fulcra_token) return fulcra @mcp.tool() async def get_workouts(start_time: datetime, end_time: datetime) -> str: """Get details about the workouts that the user has done during a period of time. Result timestamps will include time zones. Always translate timestamps to the user's local time zone when this is known. Args: start_time: The starting time of the period. Must include tz (ISO8601). end_time: the ending time of the period. Must include tz (ISO8601). """ fulcra = get_fulcra_object() workouts = fulcra.apple_workouts(start_time, end_time) return f"Workouts during {start_time} and {end_time}: " + json.dumps(workouts) @mcp.tool() async def get_metrics_catalog() -> str: """Get the catalog of available metrics that can be used in time-series API calls (`metric_time_series` and `metric_samples`). """ fulcra = get_fulcra_object() catalog = fulcra.metrics_catalog() return "Available metrics: " + json.dumps(catalog) @mcp.tool() async def get_metric_time_series( metric_name: str, start_time: datetime, end_time: datetime, sample_rate: float | None = 60.0, replace_nulls: bool | None = False, calculations: list[str] | None = None, ) -> str: """Get user's time-series data for a single Fulcra metric. Covers the time starting at start_time (inclusive) until end_time (exclusive). Result timestamps will include tz. Always translate timestamps to the user's local tz when this is known. Args: metric_name: The name of the time-series metric to retrieve. Use `get_metrics_catalog` to find available metrics. start_time: The starting time period (inclusive). Must include tz (ISO8601). end_time: The ending time (exclusive). Must include tz (ISO8601). sample_rate: Optional. The number of seconds per sample. Default is 60. Can be smaller than 1. replace_nulls: Optional. When true, replace all NA with 0. Default is False. calculations: Optional. A list of additional calculations to perform for each time slice. Not supported on cumulative metrics. Options: "max", "min", "delta", "mean", "uniques", "allpoints", "rollingmean". Returns: A JSON string representing a list of data points for the metric. For time ranges where data is missing, the values will be NA unless replace_nulls is true. """ fulcra = get_fulcra_object() # Ensure defaults are passed correctly if None kwargs = {} if sample_rate is not None: kwargs["sample_rate"] = sample_rate if replace_nulls is not None: kwargs["replace_nulls"] = replace_nulls if calculations is not None: kwargs["calculations"] = calculations time_series_df = fulcra.metric_time_series( metric=metric_name, start_time=start_time, end_time=end_time, **kwargs, ) return ( f"Time series data for {metric_name} from {start_time} to {end_time}: " + time_series_df.to_json( orient="records", date_format="iso", default_handler=str ) ) @mcp.tool() async def get_metric_samples( metric_name: str, start_time: datetime, end_time: datetime, ) -> str: """Retrieve the raw samples related to a given metric for the user during a specified period. In cases where samples cover ranges and not points in time, a sample will be returned if any part of its range intersects with the requested range. For example, if start_time is 14:00 and end_time is 15:00, a sample covering 13:30-14:30 will be included. Result timestamps will include time zones. Always translate timestamps to the user's local time zone when this is known. Args: metric_name: The name of the metric to retrieve samples for. Use `get_metrics_catalog` to find available metrics. start_time: The start of the time range (inclusive), as an ISO 8601 string or datetime object. end_time: The end of the time range (exclusive), as an ISO 8601 string or datetime object. Returns: A JSON string representing a list of raw samples for the metric. """ fulcra = get_fulcra_object() samples = fulcra.metric_samples( metric=metric_name, start_time=start_time, end_time=end_time, ) return ( f"Raw samples for {metric_name} from {start_time} to {end_time}: " + json.dumps(samples) ) @mcp.tool() async def get_sleep_cycles( start_time: datetime, end_time: datetime, cycle_gap: str | None = None, stages: list[int] | None = None, gap_stages: list[int] | None = None, clip_to_range: bool | None = True, ) -> str: """Return sleep cycles summarized from sleep stages. Processes raw sleep data samples into sleep cycles by finding gaps in the sleep sample data within a specified time interval. Result timestamps will include time zones. Always translate timestamps to the user's local time zone when this is known. Args: start_time: The starting timestamp (inclusive), as an ISO 8601 string or datetime object. end_time: The ending timestamp (exclusive), as an ISO 8601 string or datetime object. cycle_gap: Optional. Minimum time interval separating distinct cycles (e.g., "PT2H" for 2 hours). Defaults to server-side default if not provided. stages: Optional. Sleep stages to include. Defaults to all stages if not provided. gap_stages: Optional. Sleep stages to consider as gaps in sleep cycles. Defaults to server-side default if not provided. clip_to_range: Optional. Whether to clip the data to the requested date range. Defaults to True. Returns: A JSON string representing a pandas DataFrame containing the sleep cycle data. """ fulcra = get_fulcra_object() kwargs = {} if cycle_gap is not None: kwargs["cycle_gap"] = cycle_gap if stages is not None: kwargs["stages"] = stages if gap_stages is not None: kwargs["gap_stages"] = gap_stages if clip_to_range is not None: kwargs["clip_to_range"] = clip_to_range sleep_cycles_df = fulcra.sleep_cycles( start_time=start_time, end_time=end_time, **kwargs, ) # Convert DataFrame to JSON. `orient='records'` gives a list of dicts. # `date_format='iso'` ensures datetimes are ISO8601 strings. return f"Sleep cycles from {start_time} to {end_time}: " + sleep_cycles_df.to_json( orient="records", date_format="iso", default_handler=str ) @mcp.tool() async def get_location_at_time( time: datetime, window_size: int = 14400, reverse_geocode: bool | None = False, ) -> str: """Gets the user's location at the given time. If no sample is available for the exact time, searches for the closest one up to window_size seconds back. Result timestamps will include time zones. Always translate timestamps to the user's local time zone when this is known. Args: time: The point in time to get the user's location for. Must include tz (ISO8601). window_size: Optional. The size (in seconds) to look back (and optionally forward) for samples. Defaults to 14400. include_after: Optional. When true, a sample that occurs after the requested time may be returned if it is the closest one. Defaults to False. Returns: A JSON string representing the location data. """ fulcra = get_fulcra_object() kwargs = {} if window_size is not None: kwargs["window_size"] = window_size kwargs["include_after"] = True kwargs["reverse_geocode"] = True location_data = fulcra.location_at_time( time=time, **kwargs, ) return f"Location info at {time}: " + json.dumps(location_data) @mcp.tool() async def get_location_time_series( start_time: datetime, end_time: datetime, change_meters: float | None = None, sample_rate: int | None = 900, reverse_geocode: bool | None = False, ) -> str: """Retrieve a time series of locations that the user was at. Result timestamps will include time zones. Always translate timestamps to the user's local tz when this is known. Args: start_time: The start of the time range (inclusive), as an ISO 8601 string or datetime object. end_time: The end of the range (exclusive), as an ISO 8601 string or datetime object. change_meters: Optional. When specified, subsequent samples that are fewer than this many meters away will not be included. sample_rate: Optional. The length (in seconds) of each sample. Default is 900. reverse_geocode: Optional. When true, Fulcra will attempt to reverse geocode the locations and include the details in the results. Default is False. Returns: A JSON string representing a list of location data points. """ fulcra = get_fulcra_object() kwargs = {} if change_meters is not None: kwargs["change_meters"] = change_meters if sample_rate is not None: kwargs["sample_rate"] = sample_rate kwargs["look_back"] = 14400 if reverse_geocode is not None: kwargs["reverse_geocode"] = reverse_geocode location_series = fulcra.location_time_series( start_time=start_time, end_time=end_time, **kwargs, ) return f"Location time series from {start_time} to {end_time}: " + json.dumps( location_series ) @mcp.tool() async def get_user_info() -> str: """Return general info about the Context by Fulcra user. Returns user references such as time zone, calendar ids, and other metadata. """ fulcra = get_fulcra_object() user_info = fulcra.get_user_info() return "User information: " + json.dumps(user_info) mcp_asgi_app = mcp.http_app(path="/") app = FastAPI(lifespan=mcp_asgi_app.lifespan, debug=True) @app.get("/callback") async def callback_handler(request: Request) -> Response: code = request.query_params.get("code") state = request.query_params.get("state") if not code or not state: raise HTTPException(400, "Missing code or state parameter") try: redirect_uri = await oauth_provider.handle_callback(code, state) return RedirectResponse(status_code=302, url=redirect_uri) except HTTPException: raise except Exception as e: logger.error("Unexpected error", exc_info=e) raise HTTPException(500, "Unexpected error") # OpenAI sends an invalid token_endpoint_auth_method, so we ignore that with this # middleware. class OpenAIWorkaroundMiddleware: def __init__(self, app): self.app = app async def __call__(self, scope, receive, send): if scope["type"] != "http": await self.app(scope, receive, send) return # logger.info(f"Request received (ASGI): method={scope['method']}, path={scope['path']}") if scope["path"] == "/register": logger.info( "Intercepted /register request (ASGI). Attempting to modify 'token_endpoint_auth_method'." ) body_chunks = [] more_body = True while more_body: message = await receive() if message["type"] != "http.request": logger.warning( f"Unexpected ASGI message type '{message['type']}' received while reading body for /register." ) if ( not body_chunks and message.get("body") is None ): # No body part in first message logger.warning( "No body found in first message for /register. Bypassing modification." ) # Need to make sure the message we just consumed is passed on async def pass_through_receive(): yield message # The message we just consumed while True: yield await ( receive() ) # Subsequent messages from original stream await self.app(scope, pass_through_receive(), send) return body_chunks.append(message.get("body", b"")) more_body = message.get("more_body", False) if message["type"] == "http.disconnect": # Client disconnected logger.warning( "Client disconnected while reading body for /register." ) return original_body_bytes = b"".join(body_chunks) new_body_bytes = original_body_bytes if original_body_bytes: try: body_str = original_body_bytes.decode("utf-8") data = json.loads(body_str) if ( isinstance(data, dict) and data.get("token_endpoint_auth_method") == "client_secret_basic" ): data["token_endpoint_auth_method"] = "client_secret_post" new_body_bytes = json.dumps(data).encode("utf-8") # Removed the line: new_body_bytes = bytes() which was a bug logger.info( "Successfully modified 'token_endpoint_auth_method' to 'client_secret_post' for /register request (ASGI)." ) else: logger.info( "'token_endpoint_auth_method' was not 'client_secret_basic' or key not present in /register request body (ASGI). No changes made." ) except json.JSONDecodeError: logger.warning( "Request body for /register was not valid JSON (ASGI). Proceeding with original body.", exc_info=True, ) except UnicodeDecodeError: logger.warning( "Request body for /register could not be decoded as UTF-8 (ASGI). Proceeding with original body.", exc_info=True, ) except Exception: logger.error( "An unexpected error occurred while trying to modify the request body for /register (ASGI). Proceeding with original body.", exc_info=True, ) else: logger.info( "Request body for /register is empty (ASGI). No modification attempted." ) sent_synthetic_body = False async def new_receive_for_app(): nonlocal sent_synthetic_body if not sent_synthetic_body: sent_synthetic_body = True return { "type": "http.request", "body": new_body_bytes, "more_body": False, } else: return await receive() await self.app(scope, new_receive_for_app, send) else: await self.app(scope, receive, send) app.add_middleware(OpenAIWorkaroundMiddleware) app.mount("/", mcp_asgi_app) old__received_request = ServerSession._received_request async def _received_request(self, *args, **kwargs): try: return await old__received_request(self, *args, **kwargs) except RuntimeError: pass # pylint: disable-next=protected-access ServerSession._received_request = _received_request def main(): if settings.fulcra_environment == "stdio": mcp.run() else: settings.state_path.mkdir(parents=True, exist_ok=True) uvicorn.run(app, host="0.0.0.0", port=settings.port) if __name__ == "__main__": main()

Implementation Reference

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fulcradynamics/fulcra-context-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server