#!/usr/bin/env python3
"""
Looker Admin MCP - Content access and folder management tools.
Provides folder operations and content permission management.
"""
import logging
from typing import Dict, Any, Optional
from looker_admin_mcp.server.looker import handle_sdk_errors, get_sdk
logger = logging.getLogger('looker-admin-mcp.tools.content_access')
@handle_sdk_errors
async def list_folders(parent_id: Optional[str] = None) -> Dict[str, Any]:
"""
List all folders, optionally filtered by parent.
Args:
parent_id: Filter by parent folder ID (optional)
"""
sdk = get_sdk()
logger.info(f"Listing folders (parent_id={parent_id})")
folders = sdk.all_folders(parent_id=parent_id)
logger.info(f"Retrieved {len(folders)} folders")
return {
"folders": [{
"id": f.id,
"name": f.name,
"parent_id": f.parent_id,
"child_count": f.child_count,
"content_metadata_id": f.content_metadata_id,
"is_shared_root": f.is_shared_root,
"is_users_root": f.is_users_root,
} for f in folders],
"count": len(folders),
"status": "success"
}
@handle_sdk_errors
async def get_folder(folder_id: str) -> Dict[str, Any]:
"""
Get details of a specific folder.
Args:
folder_id: The ID of the folder
"""
sdk = get_sdk()
logger.info(f"Getting folder {folder_id}")
folder = sdk.folder(folder_id)
logger.info(f"Retrieved folder: {folder.name}")
return {
"id": folder.id,
"name": folder.name,
"parent_id": folder.parent_id,
"child_count": folder.child_count,
"content_metadata_id": folder.content_metadata_id,
"creator_id": folder.creator_id,
"created_at": folder.created_at,
"is_shared_root": folder.is_shared_root,
"is_users_root": folder.is_users_root,
"is_embed_shared_root": folder.is_embed_shared_root,
"is_embed_users_root": folder.is_embed_users_root,
"status": "success"
}
@handle_sdk_errors
async def get_folder_children(folder_id: str) -> Dict[str, Any]:
"""
Get all child items (subfolders, looks, dashboards) in a folder.
Args:
folder_id: The ID of the folder
"""
sdk = get_sdk()
logger.info(f"Getting children of folder {folder_id}")
children = sdk.folder_children(folder_id)
# Separate children by type
subfolders = []
looks = []
dashboards = []
for child in children:
if hasattr(child, 'child_count'): # It's a folder
subfolders.append({
"id": child.id,
"name": child.name,
"child_count": child.child_count,
})
elif hasattr(child, 'query_id'): # It's a look
looks.append({
"id": child.id,
"title": child.title,
})
else: # It's a dashboard
dashboards.append({
"id": child.id,
"title": child.title if hasattr(child, 'title') else child.name,
})
logger.info(f"Folder {folder_id} has {len(subfolders)} subfolders, {len(looks)} looks, {len(dashboards)} dashboards")
return {
"folder_id": folder_id,
"subfolders": subfolders,
"looks": looks,
"dashboards": dashboards,
"total_children": len(children),
"status": "success"
}
@handle_sdk_errors
async def get_folder_ancestors(folder_id: str) -> Dict[str, Any]:
"""
Get the folder hierarchy leading to this folder.
Args:
folder_id: The ID of the folder
"""
sdk = get_sdk()
logger.info(f"Getting ancestors of folder {folder_id}")
ancestors = sdk.folder_ancestors(folder_id)
logger.info(f"Folder {folder_id} has {len(ancestors)} ancestors")
return {
"folder_id": folder_id,
"ancestors": [{
"id": a.id,
"name": a.name,
"parent_id": a.parent_id,
} for a in ancestors],
"depth": len(ancestors),
"status": "success"
}
@handle_sdk_errors
async def search_folders(
name: Optional[str] = None,
page: Optional[int] = None,
per_page: Optional[int] = 50
) -> Dict[str, Any]:
"""
Search folders by name.
Args:
name: Filter by folder name (partial match)
page: Page number
per_page: Results per page
"""
sdk = get_sdk()
logger.info(f"Searching folders (name={name})")
folders = sdk.search_folders(
name=name,
page=page,
per_page=per_page
)
logger.info(f"Found {len(folders)} matching folders")
return {
"folders": [{
"id": f.id,
"name": f.name,
"parent_id": f.parent_id,
"child_count": f.child_count,
} for f in folders],
"count": len(folders),
"status": "success"
}
@handle_sdk_errors
async def get_content_metadata_access(content_metadata_id: int) -> Dict[str, Any]:
"""
Get access permissions for a content item.
Args:
content_metadata_id: The content metadata ID (from folder or content)
"""
sdk = get_sdk()
logger.info(f"Getting content access for metadata ID {content_metadata_id}")
accesses = sdk.all_content_metadata_accesses(content_metadata_id)
logger.info(f"Content {content_metadata_id} has {len(accesses)} access rules")
return {
"content_metadata_id": content_metadata_id,
"access_rules": [{
"id": a.id,
"permission_type": a.permission_type,
"group_id": a.group_id,
"user_id": a.user_id,
} for a in accesses],
"count": len(accesses),
"status": "success"
}
@handle_sdk_errors
async def update_content_metadata_access(
content_metadata_id: int,
permission_type: str,
group_id: Optional[int] = None,
user_id: Optional[int] = None
) -> Dict[str, Any]:
"""
Add or update access permissions for content.
Args:
content_metadata_id: The content metadata ID
permission_type: Permission type ('view' or 'edit')
group_id: Group to grant access to (optional)
user_id: User to grant access to (optional)
"""
if not group_id and not user_id:
return {"error": "Either group_id or user_id must be provided"}
if permission_type not in ['view', 'edit']:
return {"error": "permission_type must be 'view' or 'edit'"}
sdk = get_sdk()
logger.info(f"Updating content access for metadata ID {content_metadata_id}")
from looker_sdk import models40 as models
access_body = models.ContentMetaGroupUser(
content_metadata_id=content_metadata_id,
permission_type=permission_type,
group_id=group_id,
user_id=user_id
)
result = sdk.create_content_metadata_access(body=access_body)
logger.info(f"Created content access rule for metadata ID {content_metadata_id}")
return {
"id": result.id,
"content_metadata_id": content_metadata_id,
"permission_type": permission_type,
"group_id": group_id,
"user_id": user_id,
"status": "success",
"message": f"Access rule created successfully"
}
@handle_sdk_errors
async def get_content_access(content_metadata_id: int) -> Dict[str, Any]:
"""
Get all access controls for a content item with detailed information.
Args:
content_metadata_id: The content metadata ID
"""
sdk = get_sdk()
logger.info(f"Getting detailed content access for metadata ID {content_metadata_id}")
accesses = sdk.all_content_metadata_accesses(content_metadata_id)
# Get details about groups and users
access_details = []
for a in accesses:
detail = {
"id": a.id,
"permission_type": a.permission_type,
}
if a.group_id:
try:
group = sdk.group(a.group_id)
detail["group"] = {
"id": group.id,
"name": group.name
}
except Exception:
detail["group"] = {"id": a.group_id, "name": "Unknown"}
if a.user_id:
try:
user = sdk.user(a.user_id)
detail["user"] = {
"id": user.id,
"email": user.email,
"name": f"{user.first_name} {user.last_name}"
}
except Exception:
detail["user"] = {"id": a.user_id, "name": "Unknown"}
access_details.append(detail)
logger.info(f"Retrieved detailed access for content {content_metadata_id}")
return {
"content_metadata_id": content_metadata_id,
"access_rules": access_details,
"count": len(access_details),
"status": "success"
}