#!/usr/bin/env python3
"""
Looker Admin MCP server - Administration tools for Looker.
Provides user, group, role, schedule, alert, and content access management.
"""
from mcp.server.fastmcp import FastMCP
import looker_sdk
from typing import Dict, Any, Optional, Callable, Awaitable
from dotenv import load_dotenv
import os
import sys
import json
import logging
import time
import asyncio
from looker_sdk.error import SDKError
import urllib.parse
import socket
import requests
import platform
import traceback
import re
from functools import wraps
# --- Set Working Directory ---
# Get project root from environment variable passed by Claude config
project_root = os.environ.get("LOOKER_MCP_PROJECT_ROOT")
if project_root and os.path.isdir(project_root):
try:
os.chdir(project_root)
print(f"Changed working directory to: {project_root}", file=sys.stderr, flush=True) # Debug print
except Exception as cd_err:
print(f"Error changing working directory to {project_root}: {cd_err}", file=sys.stderr, flush=True)
else:
print(f"Warning: LOOKER_MCP_PROJECT_ROOT not set or invalid ('{project_root}'). CWD remains: {os.getcwd()}", file=sys.stderr, flush=True)
# --- End Set Working Directory ---
# Set up logging to stderr
logging.basicConfig(
level=logging.DEBUG if os.environ.get("MCP_DEBUG") else logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr
)
logger = logging.getLogger('looker-admin-mcp')
# Log startup
logger.info(f"Looker Admin MCP server starting up in CWD: {os.getcwd()}")
# --- Environment Variable Dump (Keep for debugging) ---
logger.info("--- Dumping Environment Variables (after potential chdir) ---")
# Log environment
logger.debug(f"LOOKER_BASE_URL: {'set' if os.environ.get('LOOKER_BASE_URL') else 'not set'}")
logger.debug(f"LOOKER_CLIENT_ID: {'set' if os.environ.get('LOOKER_CLIENT_ID') else 'not set'}")
logger.debug(f"LOOKER_CLIENT_SECRET: {'set (length: ' + str(len(os.environ.get('LOOKER_CLIENT_SECRET', ''))) + ')' if os.environ.get('LOOKER_CLIENT_SECRET') else 'not set'}")
logger.debug(f"LOOKER_VERIFY_SSL: {os.environ.get('LOOKER_VERIFY_SSL', 'not set')}")
logger.debug(f"MCP_MODE: {os.environ.get('MCP_MODE', 'not set')}")
logger.info("--- End Environment Variable Dump ---")
# --- End Debug ---
# Load environment variables from .env file (should now be found in CWD)
# Use override=True maybe? To ensure env vars passed by Claude don't prevent .env loading?
# Let's try default first.
dotenv_path = os.path.join(os.getcwd(), '.env')
logger.info(f"Attempting to load .env file from: {dotenv_path} (CWD: {os.getcwd()})")
found_dotenv = load_dotenv(dotenv_path=dotenv_path, verbose=True, override=True)
logger.info(f".env file loaded: {found_dotenv}")
# Create an MCP server
mcp = FastMCP("Looker-Admin-MCP")
logger.info("FastMCP server created")
# Register all admin tools with MCP server (called lazily to avoid circular imports)
_tools_registered = False
def register_admin_tools():
"""Register all admin tools with the MCP server."""
global _tools_registered
if _tools_registered:
return
# Import inside function to avoid circular import
from looker_admin_mcp.tools import (
# User management (8 tools - looker_me is registered separately below)
list_users, search_users, get_user, create_user, update_user,
delete_user, get_user_roles, set_user_roles,
# Group management (9 tools)
list_groups, search_groups, get_group, create_group, update_group,
delete_group, list_group_users, add_user_to_group, remove_user_from_group,
# Role management (11 tools)
list_roles, get_role, create_role, update_role, delete_role,
list_permission_sets, get_permission_set, list_model_sets, get_model_set,
create_model_set, list_role_users,
# Scheduled plans (8 tools)
list_scheduled_plans, get_scheduled_plans_for_dashboard,
get_scheduled_plans_for_look, get_scheduled_plan, create_scheduled_plan,
update_scheduled_plan, delete_scheduled_plan, run_scheduled_plan_once,
# Alerts (7 tools)
search_alerts, get_alert, create_alert, update_alert, delete_alert,
follow_alert, unfollow_alert,
# Content access (8 tools)
list_folders, get_folder, get_folder_children, get_folder_ancestors,
search_folders, get_content_metadata_access, update_content_metadata_access,
get_content_access,
# System admin (8 tools)
list_user_sessions, delete_user_session, list_user_attributes,
get_user_attribute, get_user_attribute_values, set_user_attribute_value,
get_api_versions, get_looker_version,
)
admin_tools = [
# User management (8 tools)
list_users, search_users, get_user, create_user, update_user,
delete_user, get_user_roles, set_user_roles,
# Group management (9 tools)
list_groups, search_groups, get_group, create_group, update_group,
delete_group, list_group_users, add_user_to_group, remove_user_from_group,
# Role management (11 tools)
list_roles, get_role, create_role, update_role, delete_role,
list_permission_sets, get_permission_set, list_model_sets, get_model_set,
create_model_set, list_role_users,
# Scheduled plans (8 tools)
list_scheduled_plans, get_scheduled_plans_for_dashboard,
get_scheduled_plans_for_look, get_scheduled_plan, create_scheduled_plan,
update_scheduled_plan, delete_scheduled_plan, run_scheduled_plan_once,
# Alerts (7 tools)
search_alerts, get_alert, create_alert, update_alert, delete_alert,
follow_alert, unfollow_alert,
# Content access (8 tools)
list_folders, get_folder, get_folder_children, get_folder_ancestors,
search_folders, get_content_metadata_access, update_content_metadata_access,
get_content_access,
# System admin (8 tools)
list_user_sessions, delete_user_session, list_user_attributes,
get_user_attribute, get_user_attribute_values, set_user_attribute_value,
get_api_versions, get_looker_version,
]
for tool in admin_tools:
mcp.tool()(tool)
_tools_registered = True
logger.info(f"Registered {len(admin_tools)} admin tools")
# Initialize Looker SDK
SDK_INSTANCE = None
class SDKInitializationError(Exception):
"""Custom exception for SDK initialization failures."""
pass
def init_looker_sdk():
"""Initialize Looker SDK client. Raises SDKInitializationError on failure."""
logger.info("Attempting Looker SDK Initialization...")
base_url = os.environ.get("LOOKERSDK_BASE_URL")
client_id = os.environ.get("LOOKERSDK_CLIENT_ID")
client_secret_set = "LOOKERSDK_CLIENT_SECRET" in os.environ and os.environ["LOOKERSDK_CLIENT_SECRET"]
verify_ssl = os.environ.get("LOOKERSDK_VERIFY_SSL", "true").lower() in ['true', '1', 't']
timeout = os.environ.get("LOOKERSDK_TIMEOUT")
logger.info(f"SDK Init Params: BaseURL='{base_url}', ClientID='{client_id}', SecretSet={client_secret_set}, VerifySSL={verify_ssl}, Timeout={timeout}")
if not base_url or not client_id or not client_secret_set:
err_msg = "Missing required Looker SDK environment variables (LOOKERSDK_BASE_URL, LOOKERSDK_CLIENT_ID, LOOKERSDK_CLIENT_SECRET)"
logger.error(err_msg)
raise SDKInitializationError(err_msg)
logger.info("Required SDK variables seem present. Calling looker_sdk.init40()...")
try:
client = looker_sdk.init40()
logger.info("looker_sdk.init40() completed successfully.")
return client
except Exception as e:
logger.error(f"Error during looker_sdk.init40(): {e}")
if hasattr(e, '__traceback__'):
tb_str = "".join(traceback.format_tb(e.__traceback__))
logger.error(f"SDK Init Traceback: {tb_str}")
raise SDKInitializationError(f"Failed to initialize Looker SDK: {e}") from e
def get_sdk():
"""Get or initialize the SDK instance. Raises SDKInitializationError on failure."""
global SDK_INSTANCE
if SDK_INSTANCE is None:
logger.debug("Creating new SDK instance via init_looker_sdk()")
SDK_INSTANCE = init_looker_sdk()
return SDK_INSTANCE
# --- API Error Cleaning ---
def clean_api_error(error_message: str) -> str:
"""Clean API error messages by removing HTML and providing a cleaner format."""
if error_message.strip().startswith("<!DOCTYPE") or error_message.strip().startswith("<html"):
title_match = re.search(r"<title>(.*?)</title>", error_message, re.IGNORECASE | re.DOTALL)
if title_match:
title = title_match.group(1).strip()
return f"Received HTML error page: {title}"
else:
return "Received HTML error page instead of JSON response"
return error_message
# --- Tool Error Handling Decorator ---
def handle_sdk_errors(func: Callable[..., Awaitable[Dict[str, Any]]]) -> Callable[..., Awaitable[Dict[str, Any]]]:
@wraps(func)
async def wrapper(*args, **kwargs) -> Dict[str, Any]:
tool_name = func.__name__
logger.info(f"{tool_name} tool called")
try:
# Ensure SDK is initialized ONCE before executing the tool logic.
# The tool itself will call get_sdk() again to get the instance.
get_sdk()
# Call the actual tool function logic (which no longer accepts sdk)
return await func(*args, **kwargs) # Don't pass sdk here
except SDKInitializationError as e:
error_msg = f"Looker SDK Initialization failed: {str(e)}"
logger.error(f"{tool_name}: {error_msg}")
return {"error": error_msg}
except SDKError as e:
raw_error = getattr(e, 'message', str(e))
cleaned_error = clean_api_error(raw_error)
error_msg = f"Looker API error: {cleaned_error}"
logger.error(f"{tool_name}: {error_msg} (Type: {type(e).__name__})")
return {"error": error_msg, "error_type": type(e).__name__}
except Exception as e:
error_msg = f"Unexpected error in {tool_name}: {str(e)}"
logger.exception(error_msg)
return {"error": error_msg, "error_type": type(e).__name__}
return wrapper
# --- MCP Tools ---
@mcp.tool()
@handle_sdk_errors
async def looker_me() -> Dict[str, Any]:
"""(Decorated) Get information about the currently authenticated Looker user."""
sdk = get_sdk()
logger.info("Calling sdk.me()")
user = sdk.me()
logger.info(f"SDK me() call successful for user: {user.email}")
response = {
"id": user.id,
"first_name": user.first_name,
"last_name": user.last_name,
"email": user.email,
"status": "Connected successfully to Looker API"
}
if hasattr(user, 'personal_space_id'):
response['personal_space_id'] = user.personal_space_id
if hasattr(user, 'home_space_id'):
response['home_space_id'] = user.home_space_id
return response
@mcp.tool()
@handle_sdk_errors
async def check_looker_connection() -> Dict[str, Any]:
"""(Decorated) Check connectivity to the Looker API endpoint."""
sdk = get_sdk()
base_url = os.environ.get("LOOKERSDK_BASE_URL", "")
logger.info(f"Testing connection to Looker at {base_url}")
connection_start = time.time()
api_call_start = time.time()
user = sdk.me()
api_call_time = time.time() - api_call_start
logger.debug(f"API call to sdk.me() took {api_call_time:.2f} seconds")
logger.debug(f"Successfully connected as user: {user.email}")
connection_time = time.time() - connection_start
logger.info(f"Connection successful in {connection_time:.2f}s")
return {
"base_url": base_url,
"success": True,
"status": "Connected successfully to Looker API",
"time_seconds": round(connection_time, 2),
"api_call_time": round(api_call_time, 2)
}
# --- Non-SDK Tools / Resources (Keep as is) ---
@mcp.prompt()
async def echo_prompt(message: str) -> str:
"""Create an echo prompt"""
logger.debug(f"Prompt echo called with: {message}")
return f"Please process this message: {message}"
@mcp.resource("echo://{message}")
async def echo_resource(message: str) -> str:
"""Echo a message as a resource"""
logger.debug(f"Resource echo called with: {message}")
return f"Resource echo: {message}"
@mcp.resource("debug://status")
async def debug_status() -> Dict[str, Any]:
"""Debug endpoint to check server status"""
logger.info("Debug status endpoint called")
return {
"status": "running",
"mode": os.environ.get("MCP_MODE", "unknown"),
"timestamp": time.time(),
"environment": {
"LOOKERSDK_BASE_URL_SET": bool(os.environ.get("LOOKERSDK_BASE_URL")),
"LOOKERSDK_CLIENT_ID_SET": bool(os.environ.get("LOOKERSDK_CLIENT_ID")),
"LOOKERSDK_CLIENT_SECRET_SET": bool(os.environ.get("LOOKERSDK_CLIENT_SECRET")),
"LOOKERSDK_VERIFY_SSL": os.environ.get("LOOKERSDK_VERIFY_SSL"),
"MCP_DEBUG": os.environ.get("MCP_DEBUG"),
"PYTHONUNBUFFERED": os.environ.get("PYTHONUNBUFFERED")
}
}
@mcp.tool()
async def health_check() -> Dict[str, Any]:
"""Simple health check endpoint"""
logger.info("Health check tool called")
return {
"status": "ok",
"timestamp": time.time()
}
# --- Debug Tools (Keep separate error handling for now) ---
@mcp.tool()
async def debug_network_connectivity() -> Dict[str, Any]:
logger.info("debug_network_connectivity tool called")
try:
base_url = os.environ.get('LOOKERSDK_BASE_URL', '')
if not base_url:
return {"error": "LOOKERSDK_BASE_URL not set"}
parsed_url = urllib.parse.urlparse(base_url)
host = parsed_url.netloc.split(":")[0] if ":" in parsed_url.netloc else parsed_url.netloc
port = parsed_url.port or (443 if parsed_url.scheme == "https" else 80)
socket_result = {"success": False, "error": None, "time_seconds": 0}
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
start_time = time.time()
try:
sock.connect((host, port))
socket_result["success"] = True
except Exception as e:
socket_result["error"] = str(e)
finally:
socket_result["time_seconds"] = time.time() - start_time
sock.close()
http_result = {"success": False, "status_code": None, "error": None, "time_seconds": 0}
start_time = time.time()
try:
response = requests.get(f"{base_url}/api/4.0/alive", verify=True, timeout=10)
http_result["success"] = response.status_code < 400
http_result["status_code"] = response.status_code
http_result["content"] = response.text[:100] + "..." if len(response.text) > 100 else response.text
except Exception as e:
http_result["error"] = str(e)
finally:
http_result["time_seconds"] = time.time() - start_time
env_info = {
"python_version": sys.version,
"platform": platform.platform(),
"looker_sdk_version": getattr(looker_sdk, "__version__", "unknown"),
"hostname": socket.gethostname()
}
return {
"base_url": base_url,
"host": host,
"port": port,
"socket_test": socket_result,
"http_test": http_result,
"environment": env_info
}
except Exception as e:
logger.error(f"Error in debug_network_connectivity: {e}")
return {"error": str(e)}
@mcp.tool()
async def debug_sdk_config() -> Dict[str, Any]:
logger.info("debug_sdk_config tool called")
env_vars = {}
for prefix in ['LOOKER_', 'LOOKERSDK_']:
for key, value in os.environ.items():
if key.startswith(prefix):
if 'SECRET' in key or 'PASSWORD' in key or 'KEY' in key:
masked_value = f"<set: {len(value)} characters>" if value else "<empty string>"
env_vars[key] = masked_value
else:
env_vars[key] = value
env_file_exists = os.path.exists('.env')
ini_file_exists = os.path.exists('looker.ini')
sdk_error = None
test_sdk = None
try:
global SDK_INSTANCE
SDK_INSTANCE = None # Force re-initialization for test
logger.debug("Attempting test SDK initialization for debug_sdk_config")
test_sdk = init_looker_sdk() # This will raise SDKInitializationError on failure
logger.debug("Testing SDK with me() call for debug_sdk_config")
test_sdk.me()
logger.debug("SDK test successful for debug_sdk_config")
except SDKInitializationError as e:
sdk_error = f"SDK initialization failed: {str(e)}"
except Exception as e:
if test_sdk:
sdk_error = f"SDK initialized but API call failed: {str(e)}"
else:
sdk_error = f"SDK initialization exception: {str(e)}"
finally:
SDK_INSTANCE = None # Reset global instance after test
return {
"environment_variables": env_vars,
"env_file_exists": env_file_exists,
"looker_ini_exists": ini_file_exists,
"sdk_error": sdk_error,
"sdk_initialization_working": sdk_error is None,
"runtime_info": {
"python_version": sys.version,
"sdk_version": getattr(looker_sdk, "__version__", "unknown"),
"platform": platform.platform(),
"working_directory": os.getcwd()
}
}
# --- Main Entry Point & Transport Logic ---
# (Keep main(), imports of run_cursor_stdio_mode, configure_sse_mode - they belong here)
async def run_cursor_stdio_mode(mcp_server):
"""
Run in stdio mode with special handling for Cursor MCP client.
This manually handles the initialization protocol that Cursor expects.
"""
logger.info("Starting in Cursor-compatible stdio mode")
logger.info("Waiting for input on stdin...")
# Send a heartbeat message to indicate we're ready
logger.info("Sending initial heartbeat message")
sys.stderr.write("Looker Admin MCP server ready and waiting for input\n")
sys.stderr.flush()
try:
# Make stdin non-blocking to avoid issues
import select
while True:
# Use select to wait for input without blocking
ready, _, _ = select.select([sys.stdin], [], [], 5.0)
if not ready:
# No input for 5 seconds, send heartbeat to stderr
logger.debug("No input received, still waiting...")
sys.stderr.write("Looker Admin MCP server still alive\n")
sys.stderr.flush()
continue
# Wait for input
logger.debug("Reading from stdin...")
line = sys.stdin.readline()
if not line:
logger.debug("End of input stream")
break
line = line.strip()
if not line:
logger.debug("Empty line received, continuing...")
continue
logger.info(f"Received: {line}")
try:
request = json.loads(line)
method = request.get("method", "")
id = request.get("id")
params = request.get("params", {})
# Handle initialization specially
if method == "initialize":
logger.info("Handling initialize request")
response = {
"jsonrpc": "2.0",
"id": id,
"result": {
"name": "Looker-Admin-MCP",
"version": "0.1.0",
"capabilities": {}
}
}
logger.info(f"Sending response: {json.dumps(response)}")
sys.stdout.write(json.dumps(response) + "\n")
sys.stdout.flush()
logger.info("Response flushed to stdout")
# Handle tool execution request
elif method.startswith("tool/invoke/"):
logger.info(f"Handling tool invoke request: {method}")
tool_name = method.split("/")[-1]
try:
# Initialize the SDK on each request
sdk = get_sdk()
logger.info(f"Executing tool: {tool_name}")
response = {
"jsonrpc": "2.0",
"id": id,
"result": {}
}
if tool_name == "health_check":
response["result"] = health_check()
elif tool_name == "check_looker_connection":
response["result"] = check_looker_connection()
elif tool_name == "debug_network_connectivity":
response["result"] = debug_network_connectivity()
elif tool_name == "debug_sdk_config":
response["result"] = debug_sdk_config()
elif tool_name == "looker_me":
response["result"] = looker_me()
# Admin tools will be added here as they are implemented
else:
response = {
"jsonrpc": "2.0",
"id": id,
"error": {
"code": -32601,
"message": f"Tool not found: {tool_name}"
}
}
except Exception as e:
logger.error(f"Error executing tool {tool_name}: {e}")
traceback.print_exc()
response = {
"jsonrpc": "2.0",
"id": id,
"error": {
"code": -32000,
"message": f"Error executing tool {tool_name}: {str(e)}"
}
}
# Handle resource endpoints
elif method.startswith("resource"):
logger.info(f"Handling resource request: {method}")
# Extract resource path and parameters
resource_path = method.split('/', 1)[1] if '/' in method else ""
resource_result = await echo_resource(resource_path)
response = {
"jsonrpc": "2.0",
"id": id,
"result": {"resources": [resource_result]}
}
sys.stdout.write(json.dumps(response) + "\n")
sys.stdout.flush()
# Handle unknown requests
else:
logger.warning(f"Unknown method: {method}")
response = {
"jsonrpc": "2.0",
"id": id,
"error": {
"code": -32601,
"message": f"Method not found: {method}"
}
}
logger.warning("Sending error response for unknown method")
sys.stdout.write(json.dumps(response) + "\n")
sys.stdout.flush()
logger.warning("Error response flushed to stdout")
except json.JSONDecodeError:
logger.error(f"Invalid JSON: {line}")
error_response = {
"jsonrpc": "2.0",
"id": None,
"error": {
"code": -32700,
"message": "Parse error"
}
}
logger.error("Sending JSON parse error response")
sys.stdout.write(json.dumps(error_response) + "\n")
sys.stdout.flush()
logger.error("Parse error response flushed to stdout")
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, exiting")
except Exception as e:
logger.error(f"Error in Cursor stdio mode: {str(e)}")
def configure_sse_mode(mcp_server):
"""
Configure FastMCP server for SSE mode.
(This seems misplaced - belongs in transports.py? But leave here if imported)
"""
logger.info("Configuring FastMCP for SSE mode")
# Set explicit host and port
os.environ["MCP_HOST"] = "0.0.0.0" # Listen on all interfaces
os.environ["MCP_PORT"] = "8000"
# Ensure debug logging is enabled for FastMCP
os.environ["MCP_DEBUG"] = "true"
# Log the configuration
logger.debug(f"SSE configuration: host={os.environ['MCP_HOST']}, port={os.environ['MCP_PORT']}")
# Return the configured MCP server
return mcp
def main():
"""Run the MCP server."""
# Register admin tools before starting
register_admin_tools()
mode = os.environ.get('MCP_MODE', 'stdio').lower()
logger.info(f"Starting MCP server with transport mode: {mode}")
try:
if mode == 'stdio':
from .transports import run_cursor_stdio_mode # Correct import needed
logger.info("Running stdio mode...")
# The run_cursor_stdio_mode expects the server instance
asyncio.run(run_cursor_stdio_mode(mcp))
elif mode == 'sse':
from .transports import configure_sse_mode # Correct import needed
logger.info("Configuring and running SSE mode...")
# configure_sse_mode should handle the uvicorn run/block
asyncio.run(configure_sse_mode(mcp))
else:
logger.error(f"Unsupported transport mode: {mode}")
sys.exit(1)
except Exception as e:
logger.error(f"Error running MCP server: {str(e)}")
logger.exception("Unhandled exception in main:")
raise
if __name__ == "__main__":
main()