"""Loki client for querying logs."""
import logging
from typing import Any, Dict, List, Optional
import httpx
from ..config import LokiConfig
logger = logging.getLogger(__name__)
class LokiClient:
"""Client for querying Loki."""
def __init__(self, config: LokiConfig):
"""
Initialize Loki client.
Args:
config: Loki configuration
"""
self.url = config.url.rstrip('/')
self.timeout = config.timeout
self.tenant_id = config.tenant_id
self.auth = self._setup_auth(config.auth)
def _setup_auth(self, auth_config) -> Optional[Dict[str, str]]:
"""Setup authentication headers."""
headers = {}
# Add tenant ID header for multi-tenant Loki
if self.tenant_id:
headers["X-Scope-OrgID"] = self.tenant_id
# Add authentication if configured
if auth_config.type == "basic":
import base64
credentials = f"{auth_config.username}:{auth_config.password}"
encoded = base64.b64encode(credentials.encode()).decode()
headers["Authorization"] = f"Basic {encoded}"
elif auth_config.type == "bearer":
headers["Authorization"] = f"Bearer {auth_config.bearer_token}"
return headers if headers else None
async def query_range(
self,
query: str,
start: str,
end: str,
limit: int = 100,
direction: str = "backward"
) -> Dict[str, Any]:
"""
Execute range query on Loki.
Args:
query: LogQL query string
start: Start timestamp (nanoseconds)
end: End timestamp (nanoseconds)
limit: Maximum number of log entries
direction: Query direction (forward or backward)
Returns:
Query result dictionary
"""
params = {
"query": query,
"start": start,
"end": end,
"limit": limit,
"direction": direction
}
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.url}/loki/api/v1/query_range",
params=params,
headers=self.auth or {}
)
response.raise_for_status()
return response.json()
async def query(self, query: str, limit: int = 100) -> Dict[str, Any]:
"""
Execute instant query on Loki.
Args:
query: LogQL query string
limit: Maximum number of log entries
Returns:
Query result dictionary
"""
params = {
"query": query,
"limit": limit
}
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.url}/loki/api/v1/query",
params=params,
headers=self.auth or {}
)
response.raise_for_status()
return response.json()
async def labels(
self,
start: Optional[str] = None,
end: Optional[str] = None
) -> Dict[str, Any]:
"""
Get list of label names.
Args:
start: Start timestamp (nanoseconds)
end: End timestamp (nanoseconds)
Returns:
List of label names
"""
params = {}
if start:
params["start"] = start
if end:
params["end"] = end
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.url}/loki/api/v1/labels",
params=params,
headers=self.auth or {}
)
response.raise_for_status()
return response.json()
async def label_values(
self,
label: str,
start: Optional[str] = None,
end: Optional[str] = None
) -> Dict[str, Any]:
"""
Get values for a specific label.
Args:
label: Label name
start: Start timestamp (nanoseconds)
end: End timestamp (nanoseconds)
Returns:
List of label values
"""
params = {}
if start:
params["start"] = start
if end:
params["end"] = end
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.url}/loki/api/v1/label/{label}/values",
params=params,
headers=self.auth or {}
)
response.raise_for_status()
return response.json()
async def series(
self,
match: List[str],
start: Optional[str] = None,
end: Optional[str] = None
) -> Dict[str, Any]:
"""
Get log streams that match label sets.
Args:
match: List of label matchers
start: Start timestamp (nanoseconds)
end: End timestamp (nanoseconds)
Returns:
Series metadata
"""
params = {"match[]": match}
if start:
params["start"] = start
if end:
params["end"] = end
async with httpx.AsyncClient(timeout=self.timeout) as client:
response = await client.get(
f"{self.url}/loki/api/v1/series",
params=params,
headers=self.auth or {}
)
response.raise_for_status()
return response.json()