Skip to main content
Glama
agarwalvivek29

OpenTelemetry MCP Server

loki.py5.82 kB
"""Loki client for querying logs.""" import logging from typing import Any, Dict, List, Optional import httpx from ..config import LokiConfig logger = logging.getLogger(__name__) class LokiClient: """Client for querying Loki.""" def __init__(self, config: LokiConfig): """ Initialize Loki client. Args: config: Loki configuration """ self.url = config.url.rstrip('/') self.timeout = config.timeout self.tenant_id = config.tenant_id self.auth = self._setup_auth(config.auth) def _setup_auth(self, auth_config) -> Optional[Dict[str, str]]: """Setup authentication headers.""" headers = {} # Add tenant ID header for multi-tenant Loki if self.tenant_id: headers["X-Scope-OrgID"] = self.tenant_id # Add authentication if configured if auth_config.type == "basic": import base64 credentials = f"{auth_config.username}:{auth_config.password}" encoded = base64.b64encode(credentials.encode()).decode() headers["Authorization"] = f"Basic {encoded}" elif auth_config.type == "bearer": headers["Authorization"] = f"Bearer {auth_config.bearer_token}" return headers if headers else None async def query_range( self, query: str, start: str, end: str, limit: int = 100, direction: str = "backward" ) -> Dict[str, Any]: """ Execute range query on Loki. Args: query: LogQL query string start: Start timestamp (nanoseconds) end: End timestamp (nanoseconds) limit: Maximum number of log entries direction: Query direction (forward or backward) Returns: Query result dictionary """ params = { "query": query, "start": start, "end": end, "limit": limit, "direction": direction } async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.url}/loki/api/v1/query_range", params=params, headers=self.auth or {} ) response.raise_for_status() return response.json() async def query(self, query: str, limit: int = 100) -> Dict[str, Any]: """ Execute instant query on Loki. Args: query: LogQL query string limit: Maximum number of log entries Returns: Query result dictionary """ params = { "query": query, "limit": limit } async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.url}/loki/api/v1/query", params=params, headers=self.auth or {} ) response.raise_for_status() return response.json() async def labels( self, start: Optional[str] = None, end: Optional[str] = None ) -> Dict[str, Any]: """ Get list of label names. Args: start: Start timestamp (nanoseconds) end: End timestamp (nanoseconds) Returns: List of label names """ params = {} if start: params["start"] = start if end: params["end"] = end async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.url}/loki/api/v1/labels", params=params, headers=self.auth or {} ) response.raise_for_status() return response.json() async def label_values( self, label: str, start: Optional[str] = None, end: Optional[str] = None ) -> Dict[str, Any]: """ Get values for a specific label. Args: label: Label name start: Start timestamp (nanoseconds) end: End timestamp (nanoseconds) Returns: List of label values """ params = {} if start: params["start"] = start if end: params["end"] = end async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.url}/loki/api/v1/label/{label}/values", params=params, headers=self.auth or {} ) response.raise_for_status() return response.json() async def series( self, match: List[str], start: Optional[str] = None, end: Optional[str] = None ) -> Dict[str, Any]: """ Get log streams that match label sets. Args: match: List of label matchers start: Start timestamp (nanoseconds) end: End timestamp (nanoseconds) Returns: Series metadata """ params = {"match[]": match} if start: params["start"] = start if end: params["end"] = end async with httpx.AsyncClient(timeout=self.timeout) as client: response = await client.get( f"{self.url}/loki/api/v1/series", params=params, headers=self.auth or {} ) response.raise_for_status() return response.json()

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/agarwalvivek29/opentelemetry-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server