auth.py•5.91 kB
"""Authentication tools for B4B MCP server."""
import httpx
import structlog
from mcp.server.fastmcp import Context
from ..auth.token_handler import extract_user_token, get_session_state
from ..client.api_client import get_api_client
from ..server import mcp
logger = structlog.get_logger(__name__)
@mcp.tool()
async def login(phone: str, password: str, ctx: Context) -> dict:
"""
Login to B4B API and establish authenticated session.
This tool must be called first before using any other tools.
The JWT token is stored in the session and automatically used
for all subsequent API calls.
Args:
phone: User's phone number (e.g., "+84909495665")
password: User's password
ctx: MCP context (automatically provided)
Returns:
{
"success": True/False,
"message": "...", (on success)
"error": "..." (on failure)
}
Example:
await login("+84909495665", "Admin123@")
"""
log = logger.bind(action="login", phone=phone)
log.info("Starting login")
try:
# Get the API client and its internal HTTP client
api_client = get_api_client()
client = await api_client._get_client()
# Make login request (no auth token needed for login)
response = await client.post(
"/auth/login",
json={"phone": phone, "password": password}
)
response.raise_for_status()
# Parse response
data = response.json()
# Store tokens and user info in session state
state = get_session_state(ctx)
state["user_token"] = data["access_token"]
if "refresh_token" in data:
state["refresh_token"] = data["refresh_token"]
if "email" in data:
state["user_email"] = data["email"]
if "id" in data:
state["user_id"] = data["id"]
state["is_super_admin"] = data.get("is_super_admin", False)
log.info(
"Login successful",
user_id=data.get("id"),
email=data.get("email"),
is_super_admin=data.get("is_super_admin", False)
)
return {
"success": True,
"message": f"Successfully logged in as {data.get('email', 'user')}",
"user_id": data.get("id"),
"email": data.get("email"),
"is_super_admin": data.get("is_super_admin", False)
}
except httpx.HTTPStatusError as e:
log.error(f"Login failed with HTTP error", status_code=e.response.status_code)
error_detail = ""
try:
error_body = e.response.json()
error_detail = error_body.get("detail", "") or error_body.get("message", "")
except Exception:
error_detail = e.response.text
return {
"success": False,
"error": f"Login failed: {error_detail or f'HTTP {e.response.status_code}'}"
}
except Exception as e:
log.error(f"Unexpected error during login", error=str(e))
return {
"success": False,
"error": f"Unexpected error during login: {str(e)}"
}
@mcp.tool()
async def logout(ctx: Context) -> dict:
"""
Logout and clear authentication session.
This removes the stored JWT token from the session.
You will need to call 'login' again to use other tools.
Args:
ctx: MCP context (automatically provided)
Returns:
{
"success": True/False,
"message": "..."
}
"""
log = logger.bind(action="logout")
try:
# Get user info before clearing
state = get_session_state(ctx)
user_email = state.get("user_email", "unknown")
user_id = state.get("user_id", "unknown")
# Clear all session state
state.clear()
log.info("Logout successful", user_id=user_id, user_email=user_email)
return {
"success": True,
"message": f"Successfully logged out (was: {user_email})"
}
except Exception as e:
log.error(f"Error during logout", error=str(e))
return {
"success": False,
"error": f"Error during logout: {str(e)}"
}
@mcp.tool()
async def whoami(ctx: Context) -> dict:
"""
Get information about the currently authenticated user.
Returns user details from the session metadata.
Args:
ctx: MCP context (automatically provided)
Returns:
{
"success": True/False,
"user": {...} (on success)
"error": "..." (on failure)
}
"""
log = logger.bind(action="whoami")
try:
# Extract the user token from session
token = await extract_user_token(ctx)
# Get API client
api_client = get_api_client()
# Call the /users/me endpoint
response_data = await api_client.get("/users/me", token=token)
log.info(
"User info retrieved",
user_id=response_data.get("id"),
email=response_data.get("email")
)
return {
"success": True,
"user": response_data
}
except ValueError as e:
# Not authenticated
log.warning("Not authenticated", error=str(e))
return {
"success": False,
"error": "Not authenticated. Please login first."
}
except httpx.HTTPStatusError as e:
log.error(f"HTTP error getting user info", status_code=e.response.status_code)
return {
"success": False,
"error": f"Failed to get user info: HTTP {e.response.status_code}"
}
except Exception as e:
log.error(f"Unexpected error getting user info", error=str(e))
return {
"success": False,
"error": f"Unexpected error: {str(e)}"
}