Skip to main content
Glama
Arize-ai

@arizeai/phoenix-mcp

Official
by Arize-ai
code-examples.md12.3 kB
# Code Examples ## LDAPConfig Dataclass ```python from dataclasses import dataclass from typing import Optional @dataclass class LDAPConfig: """LDAP server configuration.""" # Server connection host: str port: int = 389 tls_mode: Literal["none", "starttls", "ldaps"] = "starttls" tls_verify: bool = True # Advanced TLS (optional, for enterprise) tls_ca_cert_file: Optional[str] = None tls_client_cert_file: Optional[str] = None tls_client_key_file: Optional[str] = None # Bind credentials (service account) bind_dn: Optional[str] = None bind_password: Optional[str] = None # User search user_search_base: str user_search_filter: str = "(&(objectClass=user)(sAMAccountName=%s))" # Attribute mapping attr_email: str = "mail" attr_display_name: str = "displayName" attr_member_of: str = "memberOf" # Group search (for POSIX/OpenLDAP) group_search_base: Optional[str] = None group_search_filter: Optional[str] = None # Group to role mappings (Grafana-compatible format) # Matches Grafana's GroupToOrgRole struct (minus org_id which Phoenix doesn't support) group_role_mappings: list[dict[str, str]] # [{"group_dn": "...", "role": "..."}] @classmethod def from_env(cls) -> Optional["LDAPConfig"]: """Load LDAP config from environment variables. Raises: ValueError: If configuration is invalid json.JSONDecodeError: If GROUP_ROLE_MAPPINGS is not valid JSON """ host = os.getenv("PHOENIX_LDAP_HOST") if not host: return None # Parse and validate group role mappings (Grafana-compatible format) mappings_json = os.getenv("PHOENIX_LDAP_GROUP_ROLE_MAPPINGS", "[]") try: group_role_mappings = json.loads(mappings_json) except json.JSONDecodeError as e: raise ValueError( f"PHOENIX_LDAP_GROUP_ROLE_MAPPINGS is not valid JSON: {e}. " f"Expected format: [{{'group_dn': '...', 'role': 'ADMIN'}}]" ) # Validate role mappings structure (Grafana compatibility) if not isinstance(group_role_mappings, list): raise ValueError( "PHOENIX_LDAP_GROUP_ROLE_MAPPINGS must be a JSON array. " f"Expected format: [{{'group_dn': '...', 'role': 'ADMIN'}}]" ) VALID_ROLES = {"ADMIN", "MEMBER", "VIEWER"} # Phoenix internal role names for idx, mapping in enumerate(group_role_mappings): if not isinstance(mapping, dict): raise ValueError( f"PHOENIX_LDAP_GROUP_ROLE_MAPPINGS[{idx}] must be an object. " f"Got: {type(mapping).__name__}" ) if "group_dn" not in mapping: raise ValueError( f"PHOENIX_LDAP_GROUP_ROLE_MAPPINGS[{idx}] missing required field 'group_dn'" ) if "role" not in mapping: raise ValueError( f"PHOENIX_LDAP_GROUP_ROLE_MAPPINGS[{idx}] missing required field 'role'" ) if mapping["role"] not in VALID_ROLES: raise ValueError( f"PHOENIX_LDAP_GROUP_ROLE_MAPPINGS[{idx}]: role must be one of {VALID_ROLES}. " f"Got: '{mapping['role']}' (note: case-sensitive, use uppercase)" ) # Validate TLS mode tls_mode = os.getenv("PHOENIX_LDAP_TLS_MODE", "starttls") if tls_mode not in ("starttls", "ldaps"): raise ValueError( f"PHOENIX_LDAP_TLS_MODE must be 'starttls' or 'ldaps'. Got: '{tls_mode}'" ) # Validate group search configuration attr_member_of = os.getenv("PHOENIX_LDAP_ATTR_MEMBER_OF", "memberOf") group_search_base_dns_json = os.getenv("PHOENIX_LDAP_GROUP_SEARCH_BASE_DNS", "") group_search_filter = os.getenv("PHOENIX_LDAP_GROUP_SEARCH_FILTER") group_search_base_dns = json.loads(group_search_base_dns_json) if group_search_base_dns_json else [] # If group_search_filter is set, base_dns is required if group_search_filter and not group_search_base_dns: raise ValueError( "PHOENIX_LDAP_GROUP_SEARCH_FILTER is set but " "PHOENIX_LDAP_GROUP_SEARCH_BASE_DNS is missing. " "Both are required for POSIX group search." ) # Security warnings (log, don't fail) tls_mode = os.getenv("PHOENIX_LDAP_TLS_MODE", "starttls").lower() tls_verify = os.getenv("PHOENIX_LDAP_TLS_VERIFY", "true").lower() == "true" if tls_mode == "none": logger.warning( "PHOENIX_LDAP_TLS_MODE=none - credentials will be sent in plaintext! " "This is insecure for production." ) if tls_mode != "none" and not tls_verify: logger.warning( "PHOENIX_LDAP_TLS_VERIFY is false - certificates will not be validated! " "This is insecure for production (vulnerable to MITM attacks)." ) # Parse user search base DNs (JSON array) user_search_base_dns_json = os.getenv("PHOENIX_LDAP_USER_SEARCH_BASE_DNS", "") user_search_base_dns = json.loads(user_search_base_dns_json) if user_search_base_dns_json else [] return cls( host=host, port=int(os.getenv("PHOENIX_LDAP_PORT", "389")), tls_mode=tls_mode, tls_verify=tls_verify, bind_dn=os.getenv("PHOENIX_LDAP_BIND_DN"), bind_password=os.getenv("PHOENIX_LDAP_BIND_PASSWORD"), user_search_base_dns=tuple(user_search_base_dns), user_search_filter=os.getenv( "PHOENIX_LDAP_USER_SEARCH_FILTER", "(&(objectClass=user)(sAMAccountName=%s))" ), attr_email=os.getenv("PHOENIX_LDAP_ATTR_EMAIL", "mail"), attr_display_name=os.getenv("PHOENIX_LDAP_ATTR_DISPLAY_NAME", "displayName"), attr_member_of=attr_member_of, group_search_base_dns=tuple(group_search_base_dns), group_search_filter=group_search_filter, group_role_mappings=group_role_mappings, ) ``` ## LDAPAuthenticator Class (Skeleton) ```python from ldap3 import Server, Connection, Tls, SUBTREE from ldap3.core.exceptions import LDAPException from ldap3.utils.conv import escape_filter_chars import ssl class LDAPAuthenticator: """Handles LDAP authentication and user attribute retrieval.""" def __init__(self, config: LDAPConfig): self.config = config self.servers = self._create_servers() def _create_servers(self) -> list[Server]: """Create ldap3 Server objects for all configured hosts.""" hosts = [h.strip() for h in self.config.host.split(",")] tls_config = None if self.config.tls_mode != "none": tls_config = Tls( validate=ssl.CERT_REQUIRED if self.config.tls_verify else ssl.CERT_NONE ) servers = [] for host in hosts: server = Server( host, port=self.config.port, use_ssl=(self.config.tls_mode == "ldaps"), tls=tls_config, connect_timeout=10, # 10 second timeout - sufficient for most networks ) servers.append(server) return servers async def authenticate( self, username: str, password: str ) -> Optional[dict[str, Any]]: """ Authenticate user against LDAP and return user info. Returns: dict with keys: email, display_name, groups, user_dn or None if authentication fails """ # 1. Connect to LDAP server # 2. Bind with service account (if configured) # 3. Search for user by username # 4. Authenticate user (bind with user's credentials) # 5. Retrieve user attributes # 6. Query user's groups # 7. Return user info # Implementation details in full code... pass def map_groups_to_role(self, group_dns: list[str]) -> str: """Map LDAP group DNs to Phoenix role. Config format: {"group_dn": "...", "role": "ADMIN|MEMBER|VIEWER"} Uses Phoenix roles directly - no intermediary mapping needed. """ # Iterate through mappings in priority order (first match wins) for mapping in self.config.group_role_mappings: group_dn = mapping["group_dn"] role = mapping["role"] # Direct Phoenix role name # Check for wildcard match (matches all users) if group_dn == "*": return role # Check if user is in this specific group (case-insensitive) if self._is_member_of(group_dns, group_dn): return role # No matching groups - deny access return None def _is_member_of(self, user_groups: list[str], target_group: str) -> bool: """Check if user is member of LDAP group. Logic verified against Grafana's implementation: https://github.com/grafana/grafana/blob/main/pkg/services/ldap/helpers.go - Wildcard "*" matches all users - Case-insensitive DN comparison (strings.EqualFold) - Simple string match, no DN normalization """ # Wildcard matches everyone if target_group == "*": return True # Case-insensitive string comparison for group in user_groups: if group.lower() == target_group.lower(): return True return False ``` ## API Endpoint ```python from fastapi import APIRouter, HTTPException, status from pydantic import BaseModel router = APIRouter(prefix="/auth/ldap", tags=["auth"]) class LDAPLoginRequest(BaseModel): username: str password: str @router.post("/login") async def ldap_login( request: LDAPLoginRequest, session: AsyncSession = Depends(get_session), ) -> LoginResponse: """Authenticate user via LDAP.""" # Get LDAP config ldap_config = get_env_auth_settings().ldap_config if not ldap_config: raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="LDAP authentication not configured", ) # Authenticate with LDAP authenticator = LDAPAuthenticator(ldap_config) try: user_info = await authenticator.authenticate( request.username, request.password ) except LDAPException as e: logger.error(f"LDAP authentication failed: {e}") raise HTTPException( status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="Authentication service unavailable", ) if not user_info: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", ) # Map groups to role role_name = authenticator.map_groups_to_role(user_info["groups"]) if not role_name: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail="Access denied", ) # Create or update user in database user = await _process_ldap_user( session=session, ldap_username=request.username, user_info=user_info, role_name=role_name, ) # Issue JWT tokens access_token = create_access_token(user.id) refresh_token = create_refresh_token(user.id) return LoginResponse( access_token=access_token, refresh_token=refresh_token, ) ``` **Note**: For complete, production-ready implementations, see: - `src/phoenix/config.py` - LDAPConfig with full validation - `src/phoenix/server/ldap.py` - LDAPAuthenticator with all features - `src/phoenix/server/api/routers/auth.py` - LDAP login endpoint

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Arize-ai/phoenix'

If you have feedback or need assistance with the MCP directory API, please join our Discord server