auth_plugins.py•5.68 kB
"""pluggable authentication providers for different sdk families."""
from __future__ import annotations
import os
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class AuthPlugin(ABC):
"""base class for authentication plugins."""
@abstractmethod
def can_handle(self, sdk_name: str) -> bool:
"""check if this plugin can handle the given sdk."""
pass
@abstractmethod
def get_credentials(self) -> Dict[str, Any]:
"""return credentials/config for the sdk."""
pass
@abstractmethod
def inject_auth(self, client_class: type, kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""inject auth into client constructor kwargs."""
pass
class KubernetesAuthPlugin(AuthPlugin):
"""kubernetes authentication via kubeconfig."""
def can_handle(self, sdk_name: str) -> bool:
return sdk_name == "kubernetes"
def get_credentials(self) -> Dict[str, Any]:
from kubernetes import config
try:
config.load_kube_config()
return {"type": "kubeconfig", "loaded": True}
except Exception:
try:
config.load_incluster_config()
return {"type": "incluster", "loaded": True}
except Exception as e:
return {"type": "none", "error": str(e)}
def inject_auth(self, client_class: type, kwargs: Dict[str, Any]) -> Dict[str, Any]:
# kubernetes client auto-loads from kubeconfig
return kwargs
class GitHubAuthPlugin(AuthPlugin):
"""github authentication via personal access token."""
def can_handle(self, sdk_name: str) -> bool:
return sdk_name == "github"
def get_credentials(self) -> Dict[str, Any]:
token = os.getenv("GITHUB_TOKEN")
return {"token": token if token else None}
def inject_auth(self, client_class: type, kwargs: Dict[str, Any]) -> Dict[str, Any]:
token = os.getenv("GITHUB_TOKEN")
if token and "auth" not in kwargs:
kwargs["auth"] = token
return kwargs
class AzureAuthPlugin(AuthPlugin):
"""azure authentication via default credential chain."""
def can_handle(self, sdk_name: str) -> bool:
return sdk_name.startswith("azure")
def get_credentials(self) -> Dict[str, Any]:
from azure.identity import DefaultAzureCredential
try:
cred = DefaultAzureCredential()
return {"type": "default_chain", "credential": cred}
except Exception as e:
return {"type": "none", "error": str(e)}
def inject_auth(self, client_class: type, kwargs: Dict[str, Any]) -> Dict[str, Any]:
if "credential" not in kwargs:
from azure.identity import DefaultAzureCredential
kwargs["credential"] = DefaultAzureCredential()
return kwargs
class AWSAuthPlugin(AuthPlugin):
"""aws/boto3 authentication via default credential chain."""
def can_handle(self, sdk_name: str) -> bool:
return sdk_name in ["boto3", "botocore"]
def get_credentials(self) -> Dict[str, Any]:
# boto3 auto-discovers from ~/.aws, env vars, instance metadata
return {"type": "boto_default_chain"}
def inject_auth(self, client_class: type, kwargs: Dict[str, Any]) -> Dict[str, Any]:
# boto3 handles auth automatically
return kwargs
class GCPAuthPlugin(AuthPlugin):
"""gcp authentication via application default credentials."""
def can_handle(self, sdk_name: str) -> bool:
return sdk_name.startswith("google")
def get_credentials(self) -> Dict[str, Any]:
creds_path = os.getenv("GOOGLE_APPLICATION_CREDENTIALS")
return {"type": "adc", "creds_path": creds_path}
def inject_auth(self, client_class: type, kwargs: Dict[str, Any]) -> Dict[str, Any]:
# google clients auto-discover adc
return kwargs
class GenericAPIKeyPlugin(AuthPlugin):
"""generic api key authentication via env vars."""
def can_handle(self, sdk_name: str) -> bool:
# fallback for any sdk with {SDK_NAME}_API_KEY env var
return True
def get_credentials(self) -> Dict[str, Any]:
return {"type": "generic"}
def inject_auth(self, client_class: type, kwargs: Dict[str, Any]) -> Dict[str, Any]:
# try common patterns
for key in ["api_key", "token", "auth_token"]:
if key in kwargs:
continue
env_key = f"{client_class.__module__.split('.')[0].upper()}_API_KEY"
val = os.getenv(env_key)
if val:
kwargs[key] = val
break
return kwargs
class AuthManager:
"""manages auth plugins and selects appropriate one for each sdk."""
def __init__(self):
self.plugins: list[AuthPlugin] = [
KubernetesAuthPlugin(),
GitHubAuthPlugin(),
AzureAuthPlugin(),
AWSAuthPlugin(),
GCPAuthPlugin(),
GenericAPIKeyPlugin(), # fallback
]
def get_plugin(self, sdk_name: str) -> AuthPlugin:
"""select the first plugin that can handle the sdk."""
for plugin in self.plugins:
if plugin.can_handle(sdk_name):
return plugin
return self.plugins[-1] # fallback to generic
def inject_auth(self, sdk_name: str, client_class: type, kwargs: Dict[str, Any]) -> Dict[str, Any]:
"""inject authentication into client constructor."""
plugin = self.get_plugin(sdk_name)
return plugin.inject_auth(client_class, kwargs)