auth.py•7.94 kB
"""GCP authentication and client management."""
import asyncio
from typing import List, Dict, Any, Optional
import structlog
from google.auth import default
from google.auth.credentials import Credentials
from google.cloud import logging as cloud_logging
from google.cloud import monitoring_v3
from google.cloud import error_reporting
try:
from google.cloud.resourcemanager_v1 import ProjectsClient
from google.cloud.resourcemanager_v1.types import ListProjectsRequest, GetProjectRequest, Project
HAS_RESOURCE_MANAGER = True
except ImportError:
ProjectsClient = None
ListProjectsRequest = None
GetProjectRequest = None
Project = None
HAS_RESOURCE_MANAGER = False
from .config import Config
from .exceptions import AuthenticationError, GCPServiceError
logger = structlog.get_logger(__name__)
class GCPAuthenticator:
"""Handles GCP authentication and client creation."""
def __init__(self, config: Config):
"""Initialize the authenticator.
Args:
config: Configuration object
"""
self.config = config
self.credentials: Optional[Credentials] = None
self.project_id: Optional[str] = None
# Clients
self.logging_client: Optional[cloud_logging.Client] = None
self.monitoring_client: Optional[monitoring_v3.MetricServiceClient] = None
self.error_reporting_client: Optional[error_reporting.Client] = None
self.resource_manager_client: Optional[ProjectsClient] = None
async def initialize(self) -> None:
"""Initialize authentication and clients."""
try:
# Get credentials
if self.config.authentication.method == "service_account":
if not self.config.authentication.service_account_path:
raise AuthenticationError("Service account path not configured")
# Check credential type first
import json
with open(self.config.authentication.service_account_path, 'r') as f:
creds_data = json.load(f)
cred_type = creds_data.get('type', 'unknown')
if cred_type == 'service_account':
# Service account credentials
from google.oauth2 import service_account
self.credentials = service_account.Credentials.from_service_account_file(
self.config.authentication.service_account_path
)
self.project_id = creds_data.get('project_id')
elif cred_type == 'authorized_user':
# User credentials (from gcloud auth application-default login)
logger.info("Using authorized_user credentials (gcloud ADC)")
self.credentials, default_project = default()
# For authorized_user, get project from config or environment
self.project_id = (
self.config.get_project_id() or
creds_data.get('quota_project_id') or
default_project
)
else:
# Unknown credential type, try default flow
logger.warning(f"Unknown credential type: {cred_type}, trying default flow")
self.credentials, default_project = default()
self.project_id = self.config.get_project_id() or default_project
else:
# Use application default credentials
self.credentials, default_project = default()
self.project_id = self.config.get_project_id() or default_project
if not self.project_id:
raise AuthenticationError("No project ID found in credentials or configuration")
# Initialize clients
await self._initialize_clients()
logger.info("Authentication initialized", project_id=self.project_id,
cred_type=self.credentials.__class__.__name__ if self.credentials else 'unknown')
except Exception as e:
logger.error("Authentication failed", error=str(e))
raise AuthenticationError(f"Failed to initialize authentication: {e}")
async def _initialize_clients(self) -> None:
"""Initialize GCP service clients."""
try:
# Initialize clients in parallel
tasks = [
self._init_logging_client(),
self._init_monitoring_client(),
self._init_error_reporting_client(),
self._init_resource_manager_client(),
]
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
raise GCPServiceError(f"Failed to initialize GCP clients: {e}")
async def _init_logging_client(self) -> None:
"""Initialize Cloud Logging client."""
self.logging_client = cloud_logging.Client(
project=self.project_id,
credentials=self.credentials
)
async def _init_monitoring_client(self) -> None:
"""Initialize Cloud Monitoring client."""
self.monitoring_client = monitoring_v3.MetricServiceClient(
credentials=self.credentials
)
async def _init_error_reporting_client(self) -> None:
"""Initialize Error Reporting client."""
self.error_reporting_client = error_reporting.Client(
project=self.project_id,
credentials=self.credentials
)
async def _init_resource_manager_client(self) -> None:
"""Initialize Resource Manager client."""
if HAS_RESOURCE_MANAGER and ProjectsClient:
self.resource_manager_client = ProjectsClient(
credentials=self.credentials
)
async def list_accessible_projects(self) -> List[str]:
"""List projects accessible with current credentials."""
try:
if not self.resource_manager_client or not HAS_RESOURCE_MANAGER:
return [self.project_id] if self.project_id else []
projects = []
request = ListProjectsRequest()
for project in self.resource_manager_client.list_projects(request=request):
if project.state == Project.State.ACTIVE:
projects.append(project.project_id)
return projects
except Exception as e:
logger.warning("Failed to list projects", error=str(e))
return [self.project_id] if self.project_id else []
async def get_project_info(self, project_id: str) -> Dict[str, Any]:
"""Get information about a specific project."""
try:
if not self.resource_manager_client or not HAS_RESOURCE_MANAGER:
return {"project_id": project_id, "name": project_id}
request = GetProjectRequest(name=f"projects/{project_id}")
project = self.resource_manager_client.get_project(request=request)
return {
"project_id": project.project_id,
"name": project.name,
"state": project.state.name,
"create_time": project.create_time.isoformat() if project.create_time else None,
}
except Exception as e:
logger.warning("Failed to get project info", project_id=project_id, error=str(e))
return {"project_id": project_id, "error": str(e)}
def get_project_id(self) -> str:
"""Get the current project ID."""
if not self.project_id:
raise AuthenticationError("No project ID available")
return self.project_id