"""Prometheus client for querying metrics."""
import logging
from typing import Any, Dict, List, Optional
import httpx
from ..config import PrometheusConfig
logger = logging.getLogger(__name__)
class PrometheusClient:
"""Client for querying Prometheus."""
def __init__(self, config: PrometheusConfig):
"""
Initialize Prometheus client.
Args:
config: Prometheus configuration
"""
self.url = config.url.rstrip('/')
self.timeout = config.timeout
self.auth = self._setup_auth(config.auth)
def _setup_auth(self, auth_config) -> Optional[Dict[str, str]]:
"""Setup authentication headers."""
if auth_config.type == "basic":
import base64
credentials = f"{auth_config.username}:{auth_config.password}"
encoded = base64.b64encode(credentials.encode()).decode()
return {"Authorization": f"Basic {encoded}"}
elif auth_config.type == "bearer":
return {"Authorization": f"Bearer {auth_config.bearer_token}"}
return None
async def query(self, query: str, time: Optional[str] = None) -> Dict[str, Any]:
"""
Execute instant Prometheus query.
Args:
query: PromQL query string
time: Optional evaluation timestamp
Returns:
Query result dictionary
"""
params = {"query": query}
if time:
params["time"] = time
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.url}/api/v1/query",
params=params,
headers=self.auth or {}
)
response.raise_for_status()
return response.json()
async def query_range(
self,
query: str,
start: str,
end: str,
step: str = "15s"
) -> Dict[str, Any]:
"""
Execute range Prometheus query.
Args:
query: PromQL query string
start: Start timestamp
end: End timestamp
step: Query resolution step
Returns:
Query result dictionary
"""
params = {
"query": query,
"start": start,
"end": end,
"step": step
}
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.url}/api/v1/query_range",
params=params,
headers=self.auth or {}
)
response.raise_for_status()
return response.json()
async def series(
self,
match: List[str],
start: Optional[str] = None,
end: Optional[str] = None
) -> Dict[str, Any]:
"""
Get time series that match label sets.
Args:
match: List of series selectors
start: Start timestamp
end: End timestamp
Returns:
Series metadata
"""
params = {"match[]": match}
if start:
params["start"] = start
if end:
params["end"] = end
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.url}/api/v1/series",
params=params,
headers=self.auth or {}
)
response.raise_for_status()
return response.json()
async def labels(self, match: Optional[List[str]] = None) -> Dict[str, Any]:
"""
Get list of label names.
Args:
match: Optional series selectors to filter labels
Returns:
List of label names
"""
params = {}
if match:
params["match[]"] = match
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.url}/api/v1/labels",
params=params,
headers=self.auth or {}
)
response.raise_for_status()
return response.json()
async def label_values(
self,
label: str,
match: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Get values for a specific label.
Args:
label: Label name
match: Optional series selectors to filter values
Returns:
List of label values
"""
params = {}
if match:
params["match[]"] = match
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.url}/api/v1/label/{label}/values",
params=params,
headers=self.auth or {}
)
response.raise_for_status()
return response.json()
async def metadata(self, metric: Optional[str] = None) -> Dict[str, Any]:
"""
Get metadata about metrics.
Args:
metric: Optional metric name to get metadata for
Returns:
Metric metadata
"""
params = {}
if metric:
params["metric"] = metric
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.url}/api/v1/metadata",
params=params,
headers=self.auth or {}
)
response.raise_for_status()
return response.json()