"""Loki MCP tools."""
import logging
from typing import Any, Dict, Optional
from ..backends.loki import LokiClient
from ..utils.time_helpers import parse_time_range, to_loki_time
logger = logging.getLogger(__name__)
async def query_loki(
client: LokiClient,
query: str,
start: Optional[str] = None,
end: Optional[str] = None,
limit: int = 100,
direction: str = "backward"
) -> Dict[str, Any]:
"""
Execute raw LogQL query.
Args:
client: Loki client
query: LogQL query string
start: Start time (relative like '1h' or absolute)
end: End time (relative like 'now' or absolute)
limit: Maximum number of log entries
direction: Query direction (forward or backward)
Returns:
Query results with log entries
"""
try:
# Parse time range
start_dt, end_dt = parse_time_range(start, end)
start_ns = to_loki_time(start_dt)
end_ns = to_loki_time(end_dt)
result = await client.query_range(query, start_ns, end_ns, limit, direction)
return {
"success": True,
"query": query,
"start": start or "auto",
"end": end or "now",
"limit": limit,
"result": result
}
except Exception as e:
logger.error(f"Error executing Loki query: {e}")
return {
"success": False,
"error": str(e),
"query": query
}
async def search_logs(
client: LokiClient,
service: Optional[str] = None,
namespace: Optional[str] = None,
search_text: Optional[str] = None,
level: Optional[str] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: int = 100
) -> Dict[str, Any]:
"""
Search logs with simple filters.
Args:
client: Loki client
service: Service/job name filter
namespace: Namespace filter
search_text: Text to search for in logs
level: Log level filter (error, warn, info, debug)
start: Start time
end: End time
limit: Maximum number of results
Returns:
Filtered log entries
"""
try:
# Build LogQL query
labels = []
if service:
labels.append(f'job="{service}"')
if namespace:
labels.append(f'namespace="{namespace}"')
query = "{" + ",".join(labels) + "}" if labels else "{job=~\".+\"}"
# Add search text filter
if search_text:
query += f' |= "{search_text}"'
# Add level filter
if level:
query += f' | json | level="{level}"'
# Parse time range
start_dt, end_dt = parse_time_range(start, end)
start_ns = to_loki_time(start_dt)
end_ns = to_loki_time(end_dt)
result = await client.query_range(query, start_ns, end_ns, limit)
return {
"success": True,
"query": query,
"filters": {
"service": service,
"namespace": namespace,
"search_text": search_text,
"level": level
},
"result": result
}
except Exception as e:
logger.error(f"Error searching logs: {e}")
return {
"success": False,
"error": str(e)
}
async def list_log_labels(
client: LokiClient,
start: Optional[str] = None,
end: Optional[str] = None
) -> Dict[str, Any]:
"""
List all log stream labels.
Args:
client: Loki client
start: Start time for label discovery
end: End time for label discovery
Returns:
List of label names
"""
try:
# Parse time range if provided
start_ns = None
end_ns = None
if start or end:
start_dt, end_dt = parse_time_range(start, end)
start_ns = to_loki_time(start_dt)
end_ns = to_loki_time(end_dt)
result = await client.labels(start_ns, end_ns)
if result.get("status") == "success":
labels = result.get("data", [])
return {
"success": True,
"count": len(labels),
"labels": labels
}
else:
return {
"success": False,
"error": "Failed to fetch labels"
}
except Exception as e:
logger.error(f"Error listing log labels: {e}")
return {
"success": False,
"error": str(e)
}
async def list_log_label_values(
client: LokiClient,
label: str,
start: Optional[str] = None,
end: Optional[str] = None
) -> Dict[str, Any]:
"""
Get all values for a specific log label.
Args:
client: Loki client
label: Label name (e.g., 'namespace', 'job')
start: Start time for value discovery
end: End time for value discovery
Returns:
List of label values
"""
try:
# Parse time range if provided
start_ns = None
end_ns = None
if start or end:
start_dt, end_dt = parse_time_range(start, end)
start_ns = to_loki_time(start_dt)
end_ns = to_loki_time(end_dt)
result = await client.label_values(label, start_ns, end_ns)
if result.get("status") == "success":
values = result.get("data", [])
return {
"success": True,
"label": label,
"count": len(values),
"values": values
}
else:
return {
"success": False,
"error": "Failed to fetch label values"
}
except Exception as e:
logger.error(f"Error listing log label values: {e}")
return {
"success": False,
"error": str(e)
}