admin.py•5.57 kB
"""
Admin endpoints for managing OAuth clients and other administrative tasks.
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List, Optional
import uuid
import secrets
from app.db.session import get_db
from app.models.oauth import OAuthClient
from app.models.user import User
from app.schemas.oauth import OAuthClientCreate, OAuthClientUpdate, OAuthClient as OAuthClientSchema, OAuthClientRegisterResponse
from app.core.config import settings
from app.utils.logger import logger
from app.utils.admin import verify_admin_credentials
from app.utils.oauth import OAuthError
router = APIRouter()
@router.post("/clients/register", response_model=OAuthClientRegisterResponse)
async def register_client(
    client_data: OAuthClientCreate,
    db: Session = Depends(get_db),
    admin_user: User = Depends(verify_admin_credentials)
):
    """
    Register a new OAuth client.
    
    Args:
        client_data: Client registration data
        db: Database session
        admin_user: Authenticated admin user
        
    Returns:
        Registered client information with client credentials
    """
    try:
        # Generate client secret
        client_secret = secrets.token_urlsafe(32)
        
        # Create new OAuth client
        new_client = OAuthClient(
            client_id=uuid.uuid4(),
            client_secret=client_secret,
            client_name=client_data.client_name,
            redirect_uris=client_data.redirect_uris,
            scopes=client_data.scopes,
            is_confidential=client_data.is_confidential
        )
        
        db.add(new_client)
        db.commit()
        db.refresh(new_client)
        
        logger.info(f"Registered new OAuth client: {new_client.client_name}")
        
        # Return client credentials
        return {
            "client_id": str(new_client.client_id),
            "client_secret": new_client.client_secret,
            "client_name": new_client.client_name,
            "redirect_uris": new_client.redirect_uris,
            "scopes": new_client.scopes,
            "is_confidential": new_client.is_confidential
        }
    except Exception as e:
        logger.error(f"Error registering OAuth client: {str(e)}")
        raise OAuthError("client_registration_failed", "Failed to register OAuth client")
@router.get("/clients", response_model=List[OAuthClientSchema])
async def list_oauth_clients(
    skip: int = 0,
    limit: int = 100,
    db: Session = Depends(get_db),
    admin_user: User = Depends(verify_admin_credentials)
):
    """
    List all registered OAuth clients.
    
    Args:
        skip: Number of records to skip
        limit: Maximum number of records to return
        db: Database session
        
    Returns:
        List of OAuth clients
    """
    clients = db.query(OAuthClient).offset(skip).limit(limit).all()
    return clients
@router.get("/clients/{client_id}", response_model=OAuthClientSchema)
async def get_oauth_client(
    client_id: uuid.UUID,
    db: Session = Depends(get_db),
    admin_user: User = Depends(verify_admin_credentials)
):
    """
    Get a specific OAuth client by ID.
    
    Args:
        client_id: UUID of the client to retrieve
        db: Database session
        
    Returns:
        OAuth client details
    """
    client = db.query(OAuthClient).filter(OAuthClient.client_id == client_id).first()
    if not client:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="OAuth client not found"
        )
    return client
@router.put("/clients/{client_id}", response_model=OAuthClientSchema)
async def update_oauth_client(
    client_id: uuid.UUID,
    client_data: OAuthClientUpdate,
    db: Session = Depends(get_db),
    admin_user: User = Depends(verify_admin_credentials)
):
    """
    Update an existing OAuth client.
    
    Args:
        client_id: UUID of the client to update
        client_data: Updated client data
        db: Database session
        
    Returns:
        Updated OAuth client details
    """
    client = db.query(OAuthClient).filter(OAuthClient.client_id == client_id).first()
    if not client:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="OAuth client not found"
        )
    
    # Update client fields if provided
    if client_data.client_name is not None:
        client.client_name = client_data.client_name
    if client_data.redirect_uris is not None:
        client.redirect_uris = client_data.redirect_uris
    if client_data.scopes is not None:
        client.scopes = client_data.scopes
    if client_data.is_confidential is not None:
        client.is_confidential = client_data.is_confidential
    
    db.commit()
    db.refresh(client)
    return client
@router.delete("/clients/{client_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_oauth_client(
    client_id: uuid.UUID,
    db: Session = Depends(get_db),
    admin_user: User = Depends(verify_admin_credentials)
):
    """
    Delete an OAuth client.
    
    Args:
        client_id: UUID of the client to delete
        db: Database session
        
    Returns:
        204 No Content on success
    """
    client = db.query(OAuthClient).filter(OAuthClient.client_id == client_id).first()
    if not client:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="OAuth client not found"
        )
    
    db.delete(client)
    db.commit()
    return None