"""
AWS Managed Prometheus (AMP) client with SigV4 authentication.
This client handles HTTP requests to AMP endpoints with proper AWS SigV4 signing.
It supports EKS Pod Identity (IRSA) via boto3's default credential chain.
"""
import os
from datetime import datetime
from typing import Any
from urllib.parse import urlencode
import boto3
import httpx
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
class AMPClient:
"""Client for AWS Managed Prometheus with SigV4 authentication."""
def __init__(
self,
workspace_id: str | None = None,
region: str | None = None,
) -> None:
"""
Initialize the AMP client.
Args:
workspace_id: AMP workspace ID. Defaults to PROMETHEUS_WORKSPACE_ID env var.
region: AWS region. Defaults to AWS_REGION env var or us-east-1.
"""
self.workspace_id = workspace_id or os.environ.get("PROMETHEUS_WORKSPACE_ID")
if not self.workspace_id:
raise ValueError(
"workspace_id must be provided or PROMETHEUS_WORKSPACE_ID environment variable must be set"
)
self.region = region or os.environ.get("AWS_REGION", "us-east-1")
self.base_url = (
f"https://aps-workspaces.{self.region}.amazonaws.com"
f"/workspaces/{self.workspace_id}"
)
# Create boto3 session for credentials (supports IRSA/Pod Identity)
self._session = boto3.Session(region_name=self.region)
self._http_client: httpx.AsyncClient | None = None
async def _get_client(self) -> httpx.AsyncClient:
"""Get or create the async HTTP client."""
if self._http_client is None or self._http_client.is_closed:
self._http_client = httpx.AsyncClient(timeout=30.0)
return self._http_client
def _sign_request(
self,
method: str,
url: str,
headers: dict[str, str],
body: bytes | None = None,
) -> dict[str, str]:
"""
Sign an HTTP request with SigV4.
Args:
method: HTTP method (GET, POST, etc.)
url: Full URL to sign
headers: HTTP headers
body: Request body (optional)
Returns:
Headers with SigV4 signature added
"""
credentials = self._session.get_credentials()
if credentials is None:
raise RuntimeError("Unable to obtain AWS credentials")
# Create AWS request for signing
aws_request = AWSRequest(
method=method,
url=url,
headers=headers,
data=body or b"",
)
# Sign the request
SigV4Auth(credentials, "aps", self.region).add_auth(aws_request)
# Return signed headers
return dict(aws_request.headers)
async def _request(
self,
method: str,
path: str,
params: dict[str, Any] | None = None,
data: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""
Make a signed HTTP request to AMP.
Args:
method: HTTP method
path: API path (e.g., /api/v1/query)
params: Query parameters
data: POST data
Returns:
JSON response from AMP
"""
url = f"{self.base_url}{path}"
if params:
url = f"{url}?{urlencode(params)}"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Host": f"aps-workspaces.{self.region}.amazonaws.com",
"X-Amz-Date": datetime.utcnow().strftime("%Y%m%dT%H%M%SZ"),
}
body = urlencode(data).encode() if data else None
signed_headers = self._sign_request(method, url, headers, body)
client = await self._get_client()
response = await client.request(
method=method,
url=url,
headers=signed_headers,
content=body,
)
response.raise_for_status()
return response.json()
async def query(self, promql: str, time: str | None = None) -> dict[str, Any]:
"""
Execute an instant PromQL query.
Args:
promql: PromQL query string
time: Evaluation timestamp (RFC3339 or Unix timestamp). Defaults to current time.
Returns:
Query result from Prometheus API
"""
params: dict[str, str] = {"query": promql}
if time:
params["time"] = time
return await self._request("POST", "/api/v1/query", data=params)
async def query_range(
self,
promql: str,
start: str,
end: str,
step: str = "1m",
) -> dict[str, Any]:
"""
Execute a range PromQL query.
Args:
promql: PromQL query string
start: Start timestamp (RFC3339 or Unix timestamp)
end: End timestamp (RFC3339 or Unix timestamp)
step: Query resolution step (e.g., "1m", "5m", "1h")
Returns:
Range query result from Prometheus API
"""
data = {
"query": promql,
"start": start,
"end": end,
"step": step,
}
return await self._request("POST", "/api/v1/query_range", data=data)
async def labels(self, match: list[str] | None = None) -> dict[str, Any]:
"""
Get all label names.
Args:
match: Optional list of series selectors to filter labels
Returns:
List of label names
"""
params: dict[str, Any] = {}
if match:
params["match[]"] = match
return await self._request("GET", "/api/v1/labels", params=params if params else None)
async def label_values(
self,
label_name: str,
match: list[str] | None = None,
) -> dict[str, Any]:
"""
Get values for a specific label.
Args:
label_name: The label name to get values for
match: Optional list of series selectors to filter values
Returns:
List of label values
"""
params: dict[str, Any] = {}
if match:
params["match[]"] = match
return await self._request(
"GET",
f"/api/v1/label/{label_name}/values",
params=params if params else None,
)
async def series(
self,
match: list[str],
start: str | None = None,
end: str | None = None,
) -> dict[str, Any]:
"""
Get time series matching a set of selectors.
Args:
match: List of series selectors
start: Start timestamp
end: End timestamp
Returns:
List of matching series
"""
params: dict[str, Any] = {"match[]": match}
if start:
params["start"] = start
if end:
params["end"] = end
return await self._request("GET", "/api/v1/series", params=params)
async def metadata(self, metric: str | None = None) -> dict[str, Any]:
"""
Get metric metadata.
Args:
metric: Optional metric name to filter metadata
Returns:
Metric metadata
"""
params: dict[str, str] = {}
if metric:
params["metric"] = metric
return await self._request(
"GET",
"/api/v1/metadata",
params=params if params else None,
)
async def close(self) -> None:
"""Close the HTTP client."""
if self._http_client:
await self._http_client.aclose()
self._http_client = None
async def __aenter__(self) -> "AMPClient":
"""Async context manager entry."""
return self
async def __aexit__(self, *args: Any) -> None:
"""Async context manager exit."""
await self.close()