import asyncio
import os
import time
from dataclasses import dataclass
from typing import Optional
import httpx
from dotenv import load_dotenv
def _load_env() -> None:
dotenv_path = os.getenv("DOTENV_PATH")
if dotenv_path:
load_dotenv(dotenv_path)
else:
load_dotenv()
@dataclass(frozen=True)
class OHIPConfig:
base_url: str
client_id: str
client_secret: str
app_key: str
@classmethod
def from_env(cls) -> "OHIPConfig":
_load_env()
base_url = os.getenv("OHIP_BASE_URL", "").strip()
client_id = os.getenv("OHIP_CLIENT_ID", "").strip()
client_secret = os.getenv("OHIP_CLIENT_SECRET", "").strip()
app_key = os.getenv("OHIP_APP_KEY", "").strip()
missing = [
name
for name, value in [
("OHIP_BASE_URL", base_url),
("OHIP_CLIENT_ID", client_id),
("OHIP_CLIENT_SECRET", client_secret),
("OHIP_APP_KEY", app_key),
]
if not value
]
if missing:
raise ValueError(f"Missing environment variables: {', '.join(missing)}")
return cls(
base_url=base_url,
client_id=client_id,
client_secret=client_secret,
app_key=app_key,
)
class OHIPAuthenticator:
_instance: Optional["OHIPAuthenticator"] = None
_instance_lock = asyncio.Lock()
def __init__(self, config: OHIPConfig) -> None:
self._config = config
self._token: Optional[str] = None
self._expires_at: float = 0.0
self._lock = asyncio.Lock()
@classmethod
async def get_instance(cls) -> "OHIPAuthenticator":
if cls._instance is None:
async with cls._instance_lock:
if cls._instance is None:
cls._instance = OHIPAuthenticator(OHIPConfig.from_env())
return cls._instance
async def get_valid_token(self) -> str:
now = time.time()
if self._token and (self._expires_at - now) > 60:
return self._token
async with self._lock:
now = time.time()
if self._token and (self._expires_at - now) > 60:
return self._token
await self._request_new_token()
if not self._token:
raise RuntimeError("Token retrieval failed")
return self._token
async def _request_new_token(self) -> None:
url = f"{self._config.base_url.rstrip('/')}/oauth/v1/tokens"
data = {"grant_type": "client_credentials"}
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(
url,
data=data,
auth=httpx.BasicAuth(
self._config.client_id, self._config.client_secret
),
headers={"Accept": "application/json"},
)
if response.status_code >= 400:
raise RuntimeError(f"OAuth error {response.status_code}: {response.text}")
payload = response.json()
access_token = payload.get("access_token")
if not access_token:
raise RuntimeError("OAuth response missing access_token")
expires_in = int(payload.get("expires_in", 0)) or 300
self._token = access_token
self._expires_at = time.time() + expires_in