#!/usr/bin/env python3
"""
Looker Admin MCP - User management tools.
Provides user CRUD operations and role assignment.
"""
import logging
from typing import Dict, Any, List, Optional
import looker_sdk
from looker_sdk import models40 as models
from looker_admin_mcp.server.looker import mcp, handle_sdk_errors, get_sdk
logger = logging.getLogger('looker-admin-mcp.tools.user')
@handle_sdk_errors
async def looker_me() -> Dict[str, Any]:
"""Get information about the currently authenticated Looker user."""
sdk = get_sdk()
logger.info("Calling sdk.me()")
user = sdk.me()
logger.info(f"SDK me() call successful for user: {user.email}")
return {
"id": user.id,
"first_name": user.first_name,
"last_name": user.last_name,
"email": user.email,
"is_disabled": user.is_disabled,
"role_ids": user.role_ids,
"group_ids": user.group_ids,
"personal_space_id": user.personal_space_id,
"home_space_id": user.home_space_id,
"status": "Connected successfully to Looker API"
}
@handle_sdk_errors
async def list_users(
page: Optional[int] = None,
per_page: Optional[int] = 50,
sorts: Optional[str] = None,
ids: Optional[str] = None
) -> Dict[str, Any]:
"""
List all users with optional pagination and filtering.
Args:
page: Page number to return (1-indexed)
per_page: Number of results per page (default 50, max 100)
sorts: Field to sort by (e.g., 'email', '-created_at')
ids: Comma-separated list of user IDs to filter
"""
sdk = get_sdk()
logger.info(f"Listing users (page={page}, per_page={per_page})")
# Parse ids if provided as comma-separated string
id_list = None
if ids:
id_list = [int(id.strip()) for id in ids.split(',')]
users = sdk.all_users(
page=page,
per_page=per_page,
sorts=sorts,
ids=id_list
)
logger.info(f"Retrieved {len(users)} users")
return {
"users": [{
"id": u.id,
"first_name": u.first_name,
"last_name": u.last_name,
"email": u.email,
"is_disabled": u.is_disabled,
} for u in users],
"count": len(users),
"page": page,
"per_page": per_page,
"status": "success"
}
@handle_sdk_errors
async def search_users(
first_name: Optional[str] = None,
last_name: Optional[str] = None,
email: Optional[str] = None,
is_disabled: Optional[bool] = None,
filter_or: Optional[bool] = False,
page: Optional[int] = None,
per_page: Optional[int] = 50
) -> Dict[str, Any]:
"""
Search users by various criteria.
Args:
first_name: Filter by first name (partial match)
last_name: Filter by last name (partial match)
email: Filter by email (partial match)
is_disabled: Filter by disabled status
filter_or: If True, use OR logic; if False, use AND logic
page: Page number
per_page: Results per page
"""
sdk = get_sdk()
logger.info(f"Searching users (email={email}, first_name={first_name})")
users = sdk.search_users(
first_name=first_name,
last_name=last_name,
email=email,
is_disabled=is_disabled,
filter_or=filter_or,
page=page,
per_page=per_page
)
logger.info(f"Found {len(users)} matching users")
return {
"users": [{
"id": u.id,
"first_name": u.first_name,
"last_name": u.last_name,
"email": u.email,
"is_disabled": u.is_disabled,
"role_ids": u.role_ids,
"group_ids": u.group_ids,
} for u in users],
"count": len(users),
"status": "success"
}
@handle_sdk_errors
async def get_user(user_id: int) -> Dict[str, Any]:
"""
Get detailed information about a specific user.
Args:
user_id: The ID of the user to retrieve
"""
sdk = get_sdk()
logger.info(f"Getting user {user_id}")
user = sdk.user(user_id)
logger.info(f"Retrieved user: {user.email}")
return {
"id": user.id,
"first_name": user.first_name,
"last_name": user.last_name,
"email": user.email,
"is_disabled": user.is_disabled,
"locale": user.locale,
"personal_space_id": user.personal_space_id,
"home_space_id": user.home_space_id,
"role_ids": user.role_ids,
"group_ids": user.group_ids,
"credentials_email": {"email": user.credentials_email.email} if user.credentials_email else None,
"status": "success"
}
@handle_sdk_errors
async def create_user(
first_name: str,
last_name: str,
email: str,
is_disabled: Optional[bool] = False
) -> Dict[str, Any]:
"""
Create a new Looker user.
Args:
first_name: User's first name
last_name: User's last name
email: User's email address
is_disabled: Whether the user account should be disabled (default False)
"""
sdk = get_sdk()
logger.info(f"Creating user: {email}")
user_body = models.WriteUser(
first_name=first_name,
last_name=last_name,
is_disabled=is_disabled
)
created_user = sdk.create_user(body=user_body)
# Create email credentials for the user
if email:
try:
sdk.create_user_credentials_email(
created_user.id,
body=models.WriteCredentialsEmail(email=email)
)
logger.info(f"Created email credentials for user {created_user.id}")
except Exception as e:
logger.warning(f"Could not create email credentials: {e}")
logger.info(f"Created user {created_user.id}: {email}")
return {
"id": created_user.id,
"first_name": created_user.first_name,
"last_name": created_user.last_name,
"email": email,
"is_disabled": created_user.is_disabled,
"status": "success",
"message": f"User {email} created successfully"
}
@handle_sdk_errors
async def update_user(
user_id: int,
first_name: Optional[str] = None,
last_name: Optional[str] = None,
email: Optional[str] = None,
is_disabled: Optional[bool] = None
) -> Dict[str, Any]:
"""
Update an existing user's information.
Args:
user_id: The ID of the user to update
first_name: New first name (optional)
last_name: New last name (optional)
email: New email address (optional)
is_disabled: Enable/disable the user account (optional)
"""
sdk = get_sdk()
logger.info(f"Updating user {user_id}")
# Build update body with only provided fields
update_fields = {}
if first_name is not None:
update_fields['first_name'] = first_name
if last_name is not None:
update_fields['last_name'] = last_name
if is_disabled is not None:
update_fields['is_disabled'] = is_disabled
user_body = models.WriteUser(**update_fields)
updated_user = sdk.update_user(user_id, body=user_body)
# Update email credentials if email provided
if email:
try:
sdk.update_user_credentials_email(
user_id,
body=models.WriteCredentialsEmail(email=email)
)
logger.info(f"Updated email credentials for user {user_id}")
except Exception as e:
logger.warning(f"Could not update email credentials: {e}")
logger.info(f"Updated user {user_id}")
return {
"id": updated_user.id,
"first_name": updated_user.first_name,
"last_name": updated_user.last_name,
"is_disabled": updated_user.is_disabled,
"status": "success",
"message": f"User {user_id} updated successfully"
}
@handle_sdk_errors
async def delete_user(user_id: int, confirm: bool = False) -> Dict[str, Any]:
"""
Delete a user. Requires confirm=True to execute.
WARNING: This is a destructive operation that cannot be undone.
Args:
user_id: The ID of the user to delete
confirm: Must be True to execute the deletion
"""
if not confirm:
return {
"error": "Destructive operation requires confirm=True",
"warning": f"This will permanently delete user {user_id}. Set confirm=True to proceed.",
"user_id": user_id
}
sdk = get_sdk()
logger.warning(f"Deleting user {user_id} (confirmed)")
# Get user info before deletion for response
try:
user = sdk.user(user_id)
user_email = user.email or f"user_{user_id}"
except Exception:
user_email = f"user_{user_id}"
sdk.delete_user(user_id)
logger.warning(f"Deleted user {user_id}")
return {
"status": "success",
"deleted_user_id": user_id,
"deleted_user_email": user_email,
"message": f"User {user_id} has been permanently deleted"
}
@handle_sdk_errors
async def get_user_roles(user_id: int) -> Dict[str, Any]:
"""
Get all roles assigned to a user.
Args:
user_id: The ID of the user
"""
sdk = get_sdk()
logger.info(f"Getting roles for user {user_id}")
roles = sdk.user_roles(user_id)
logger.info(f"User {user_id} has {len(roles)} roles")
return {
"user_id": user_id,
"roles": [{
"id": r.id,
"name": r.name,
"permission_set_id": r.permission_set_id,
"model_set_id": r.model_set_id,
} for r in roles],
"count": len(roles),
"status": "success"
}
@handle_sdk_errors
async def set_user_roles(user_id: int, role_ids: str) -> Dict[str, Any]:
"""
Set roles for a user (replaces all existing roles).
Args:
user_id: The ID of the user
role_ids: Comma-separated list of role IDs to assign
"""
sdk = get_sdk()
logger.info(f"Setting roles for user {user_id}: {role_ids}")
# Parse role IDs
role_id_list = [int(rid.strip()) for rid in role_ids.split(',')]
# Set the roles
roles = sdk.set_user_roles(user_id, body=role_id_list)
logger.info(f"Set {len(roles)} roles for user {user_id}")
return {
"user_id": user_id,
"roles": [{
"id": r.id,
"name": r.name,
} for r in roles],
"count": len(roles),
"status": "success",
"message": f"Assigned {len(roles)} roles to user {user_id}"
}