"""
User Profile Tool - Get current user's information
This tool provides SAFE, FILTERED user information following OWASP best practices:
- No sensitive IDs or internal fields
- No timestamps or audit fields
- Permissions grouped by module
- Consistent, structured responses
- Accessible to all users (about themselves only)
Security: Follows OWASP API3:2019 - Excessive Data Exposure prevention
"""
from typing import Dict, Any, List
from dataclasses import dataclass
from mcp.server.fastmcp import FastMCP
from src.core.tool_registry import ContextAwareToolRegistry
from src.core.domain_policies import SYSTEM_POLICIES
from src.core import wrap_response
from src.observability import get_logger
logger = get_logger(__name__)
@dataclass
class UserProfileData:
"""
Filtered user profile data structure.
This is the SAFE data we return to users - no sensitive fields!
"""
# Basic Info
name: str
email: str
title: str
role: str
status: str
# Organization Info
organization_name: str
organization_type: str
# Permissions (grouped by module)
permissions: Dict[str, List[str]]
# Summary
total_permissions: int
modules_access: List[str]
def filter_user_data(raw_data: Dict[str, Any]) -> UserProfileData:
"""
Clean and normalize raw API user data into the SAFE profile format.
Args:
raw_data: Either the full Laravel response {"data": {"user": {...}}}
or just the user object directly {...}
"""
# Handle both formats: full response or direct user object
if "data" in raw_data and "user" in raw_data["data"]:
user = raw_data["data"]["user"]
else:
user = raw_data
user_info = user.get("user_info", {})
user_org = user.get("user_organization", {})
user_role = user.get("user_role", {})
raw_permissions = user.get("user_permission", [])
# Group permissions by module
permissions_by_module: Dict[str, List[str]] = {}
for perm in raw_permissions:
module = perm.get("module_name", "Other").strip()
perm_name = perm.get("name", "")
action = _format_permission_name(perm_name)
if not action:
continue
permissions_by_module.setdefault(module, [])
if action not in permissions_by_module[module]:
permissions_by_module[module].append(action)
# Sort permissions
for module in permissions_by_module:
permissions_by_module[module].sort()
# Build user's display name
display_name = f"{user_info.get('first_name', '')} {user_info.get('last_name', '')}".strip()
if not display_name:
display_name = user.get("name", "Unknown")
# Determine best organization name field
org_name = (
user_org.get("organization_name")
or user_org.get("name")
or ""
)
return UserProfileData(
name=display_name,
email=user.get("email", ""),
title=user_info.get("title", ""),
role=user_role.get("name", "User"),
status=user.get("status", "active").title(),
organization_name=org_name,
organization_type=user_org.get("organization_type", "single").title(),
permissions=permissions_by_module,
total_permissions=len(raw_permissions),
modules_access=sorted(list(permissions_by_module.keys())),
)
def _format_permission_name(perm_name: str) -> str:
"""
Convert permission name to user-friendly format.
Handles cases like "contracts.renewal" and ensures
plural/singular resource names are normalized.
"""
if not perm_name or "." not in perm_name:
return perm_name
resource, action = perm_name.split(".", 1)
# Normalize resource names (singular → plural form)
# Your API mixes "contract" and "contracts"
resource = resource.lower()
resource_normalized_map = {
"users": "Users",
"user": "Users",
"organization": "Organizations",
"submodule": "Platforms/Dealerships",
"data": "Data",
"contract": "Contracts",
"contracts": "Contracts",
"invoice": "Invoices",
"approval": "Approvals",
}
resource_text = resource_normalized_map.get(resource, resource.title())
# Action map fixes
action_map = {
"index": "View",
"show": "View Details",
"create": "Create",
"update": "Edit",
"delete": "Delete",
"dates": "View Dates",
"clauses": "View Clauses",
"data": "View Data",
"summary": "View Summary",
"files": "View Files",
"owner": "View Owner",
"renewal": "Manage Renewals",
"paid": "Mark as Paid",
"set_filters": "Set Filters",
"global_filters": "Manage Global Filters",
}
action_text = action_map.get(action, action.replace("_", " ").title())
return f"{action_text} {resource_text}"
def format_user_profile_response(profile: UserProfileData) -> Dict[str, Any]:
"""
Format user profile into consistent, structured response.
This ensures the LLM always gets the same structure, leading to
consistent answers regardless of the model used.
Args:
profile: Filtered user profile data
Returns:
Structured response dictionary
"""
return {
"profile": {
"name": profile.name,
"email": profile.email,
"title": profile.title,
"role": profile.role,
"status": profile.status,
},
"organization": {
"name": profile.organization_name,
"type": profile.organization_type,
},
"access": {
"total_permissions": profile.total_permissions,
"modules": profile.modules_access,
"permissions_by_module": profile.permissions,
},
"summary": {
"description": f"{profile.name} is a {profile.role} at {profile.organization_name} with access to {len(profile.modules_access)} modules: {', '.join(profile.modules_access)}.",
"can_do": _generate_capabilities_summary(profile),
}
}
def _generate_capabilities_summary(profile: UserProfileData) -> List[str]:
capabilities = []
modules = {m.lower(): m for m in profile.modules_access}
if "user" in modules:
capabilities.append("Manage users and user accounts")
if "organization" in modules:
capabilities.append("Manage organizations, platforms, and dealerships")
if "data" in modules:
capabilities.append("Access and manage organizational data")
if "contracts" in modules:
capabilities.append("View and manage vendor contracts")
if "invoice" in modules:
capabilities.append("Handle invoices, billing, and payments")
if "approval" in modules:
capabilities.append("Review and approve contract workflows")
if "global admin" in profile.role.lower():
capabilities.append("Full administrative access")
return capabilities
async def get_user_profile(
user_context: FastMCP,
**kwargs
) -> Dict[str, Any]:
"""
Get current user's profile information.
This tool is accessible to ALL users and returns SAFE, FILTERED
information about the authenticated user.
Security Features:
- No sensitive IDs exposed
- No internal timestamps or audit fields
- Permissions formatted in user-friendly way
- Consistent response structure
- No raw database dumps
Args:
user_context: Current user's context (automatically injected)
**kwargs: Additional arguments (ignored)
Returns:
Envelope with filtered user profile data
"""
trace_id = kwargs.get("trace_id", "unknown")
logger.info(
"user_profile_requested",
trace_id=trace_id,
user_id=user_context.user_id,
role=user_context.role.name
)
try:
# Get raw user data from cache (already fetched during auth)
# This is the data from /api/get-login-user-data
from src.web_chat.user_cache import get_user_from_token
# Get cached user data using the bearer token
if not user_context.bearer_token:
return wrap_response(
tool_name="get_user_profile",
trace_id=trace_id,
data={
"profile": None,
"organization": None,
"access": None,
"summary": None,
},
missing_reason="No authentication token available",
)
cached_user = await get_user_from_token(user_context.bearer_token)
if not cached_user or not cached_user.raw_data:
return wrap_response(
tool_name="get_user_profile",
trace_id=trace_id,
data={
"profile": None,
"organization": None,
"access": None,
"summary": None,
},
missing_reason="User data not available",
)
# Filter the raw data to safe fields
profile = filter_user_data(cached_user.raw_data)
# Format into consistent structure
response = format_user_profile_response(profile)
logger.info(
"user_profile_returned",
trace_id=trace_id,
user_id=user_context.user_id,
modules_count=len(profile.modules_access),
permissions_count=profile.total_permissions
)
return wrap_response(
tool_name="get_user_profile",
trace_id=trace_id,
data=response,
)
except Exception as e:
logger.error(
"user_profile_failed",
trace_id=trace_id,
user_id=user_context.user_id,
error=str(e),
error_type=type(e).__name__
)
return wrap_response(
tool_name="get_user_profile",
trace_id=trace_id,
data={
"profile": None,
"organization": None,
"access": None,
"summary": None,
},
missing_reason=f"Failed to retrieve user profile: {str(e)}",
)
# Tool registration
def register(registry: ContextAwareToolRegistry) -> None:
"""Register the user profile tool with the MCP server."""
@registry.register_tool(
name="get_user_profile",
policy=SYSTEM_POLICIES["get_user_profile"],
description=(
"Get the current user's profile information including their NAME, email, role, organization, "
"and permissions. This tool shows what modules and features the user has access to. "
"ALWAYS use this tool when the user asks about their identity or account. "
"Examples: 'What is my name?', 'Who am I?', 'What is my email?', 'What is my role?', "
"'What can I do?', 'What permissions do I have?', 'What modules can I access?'"
)
)
async def get_user_profile_tool(
user_id: int = 0,
organization_id: int = 0,
role: int = 1,
bearer_token: str = "",
email: str = "",
name: str = "",
platform_id: int = None,
dealership_id: int = None,
permissions: list = None,
trace_id: str = "unknown",
) -> Dict[str, Any]:
"""
Get user profile information.
This tool is automatically called when users ask about their account,
permissions, or capabilities.
Args:
user_id: User ID (auto-injected)
organization_id: Organization ID (auto-injected)
Returns:
User profile with safe, filtered information
"""
# Create user context from injected parameters
from src.core import Role
user_context = UserContext(
user_id=user_id,
role=Role(role),
bearer_token=bearer_token,
organization_id=organization_id,
platform_id=platform_id,
dealership_id=dealership_id,
email=email,
name=name,
permissions=permissions or [],
)
return await get_user_profile(user_context, trace_id=trace_id)