"""Token handling utilities for authentication."""
from mcp.server.fastmcp import Context
from ..utils.errors import MCPAuthenticationError
# Global session state store - keyed by session ID
_session_state = {}
def get_session_state(ctx: Context) -> dict:
"""
Get the session state dictionary for the current session.
Args:
ctx: MCP context
Returns:
Dictionary containing session state
"""
session_id = id(ctx.session)
if session_id not in _session_state:
_session_state[session_id] = {}
return _session_state[session_id]
async def extract_user_token(ctx: Context) -> str:
"""
Extract JWT token from MCP session state.
Args:
ctx: MCP context
Returns:
JWT token string
Raises:
MCPAuthenticationError: If token is missing or invalid
"""
state = get_session_state(ctx)
token = state.get("user_token")
if not token:
raise MCPAuthenticationError(
"Authentication token is missing. Please login first using the 'login' tool."
)
if not isinstance(token, str):
raise MCPAuthenticationError(
"Authentication token must be a string. "
f"Received type: {type(token).__name__}"
)
if not token.strip():
raise MCPAuthenticationError(
"Authentication token cannot be empty."
)
return token.strip()