We provide all the information about MCP servers via our MCP API.
curl -X GET 'https://glama.ai/api/mcp/v1/servers/malindarathnayake/Overwatch-mcp'
If you have feedback or need assistance with the MCP directory API, please join our Discord server
"""InfluxDB 2.x API client."""
import csv
import io
import logging
from typing import Any
from overwatch_mcp.clients.base import BaseHTTPClient
from overwatch_mcp.models.config import InfluxDBConfig
from overwatch_mcp.models.errors import ErrorCode, OverwatchError
logger = logging.getLogger(__name__)
class InfluxDBClient(BaseHTTPClient):
"""Client for InfluxDB 2.x API."""
def __init__(self, config: InfluxDBConfig):
"""
Initialize InfluxDB client.
Args:
config: InfluxDB configuration
"""
# Normalize URL - strip /api/v2 suffix if present (paths already include it)
url = config.url.rstrip("/")
if url.endswith("/api/v2"):
url = url[:-7]
elif url.endswith("/api"):
url = url[:-4]
headers = {
"Authorization": f"Token {config.token}",
"Accept": "application/csv",
"Content-Type": "application/vnd.flux",
}
super().__init__(
base_url=url,
timeout_seconds=config.timeout_seconds,
headers=headers,
verify_ssl=config.verify_ssl,
)
self.config = config
def _validate_bucket(self, bucket: str) -> None:
"""
Validate bucket against allowlist.
Args:
bucket: Bucket name to validate
Raises:
OverwatchError: If bucket is not in allowlist
"""
if bucket not in self.config.allowed_buckets:
raise OverwatchError(
code=ErrorCode.BUCKET_NOT_ALLOWED,
message=f"Bucket '{bucket}' is not in allowed list",
details={
"requested_bucket": bucket,
"allowed_buckets": self.config.allowed_buckets,
},
)
def _validate_query_bucket(self, query: str, bucket: str) -> None:
"""
Validate that query references the specified bucket.
Args:
query: Flux query
bucket: Expected bucket name
Raises:
OverwatchError: If query doesn't reference the correct bucket
"""
# Simple validation - check if 'from(bucket: "name")' exists
if f'from(bucket: "{bucket}")' not in query and f"from(bucket:\"{bucket}\")" not in query:
raise OverwatchError(
code=ErrorCode.INVALID_QUERY,
message=f"Query must reference bucket '{bucket}'",
details={"bucket": bucket, "query": query[:200]},
)
def _parse_csv_response(self, csv_text: str) -> list[dict[str, Any]]:
"""
Parse InfluxDB annotated CSV response to list of records.
InfluxDB returns CSV with annotation rows (starting with #) followed by data.
Args:
csv_text: CSV response text
Returns:
List of record dictionaries
Raises:
OverwatchError: If CSV parsing fails
"""
try:
lines = csv_text.strip().split("\n")
if not lines:
return []
# Find where annotation rows end and data begins
# Annotations start with #, then comes header row, then data rows
has_annotations = any(line.startswith("#") for line in lines)
non_annotation_lines = []
for line in lines:
if not line.startswith("#"):
non_annotation_lines.append(line)
# If we have no annotations and multiple lines, this isn't InfluxDB format
if not has_annotations and len(lines) > 1:
raise ValueError("Invalid InfluxDB CSV format: missing annotation rows")
if len(non_annotation_lines) < 1: # Need at least header
return []
# First non-annotation line is the header
header_line = non_annotation_lines[0]
data_lines = non_annotation_lines[1:] if len(non_annotation_lines) > 1 else []
# If there are no data lines (only empty lines), return empty
if not any(line.strip() for line in data_lines):
return []
# Parse CSV
reader = csv.DictReader(
io.StringIO("\n".join([header_line] + data_lines))
)
return list(reader)
except Exception as e:
raise OverwatchError(
code=ErrorCode.UPSTREAM_CLIENT_ERROR,
message=f"Failed to parse InfluxDB CSV response: {str(e)}",
details={"error": str(e), "csv_preview": csv_text[:500]},
)
async def query(self, query: str, bucket: str) -> list[dict[str, Any]]:
"""
Execute Flux query against InfluxDB.
Args:
query: Flux query
bucket: Target bucket
Returns:
List of records
Raises:
OverwatchError: On API errors, bucket not allowed, or parsing failures
"""
# Validate bucket
self._validate_bucket(bucket)
self._validate_query_bucket(query, bucket)
# Add org parameter
params = {"org": self.config.org}
# Make request with raw Flux query body
client = await self._get_client()
try:
response = await client.post(
"/api/v2/query",
params=params,
content=query,
headers={
**self.default_headers,
"Content-Type": "application/vnd.flux",
},
)
if response.status_code != 200:
raise OverwatchError(
code=ErrorCode.UPSTREAM_CLIENT_ERROR,
message=f"InfluxDB query failed: {response.status_code}",
details={
"status_code": response.status_code,
"response": response.text,
},
)
# Parse CSV response
return self._parse_csv_response(response.text)
except OverwatchError:
raise
except Exception as e:
raise OverwatchError(
code=ErrorCode.UPSTREAM_SERVER_ERROR,
message=f"InfluxDB request failed: {str(e)}",
details={"error": str(e)},
)
async def health_check(self) -> bool:
"""
Check if InfluxDB is reachable.
Returns:
True if healthy, False otherwise
"""
endpoint = "/health"
full_url = f"{self.base_url}{endpoint}"
logger.debug(f"InfluxDB health check: {full_url}")
try:
response = await self.get(endpoint)
logger.debug(f"InfluxDB health response: {response.status_code} - {response.text[:200]}")
return response.status_code == 200
except OverwatchError as e:
logger.debug(f"InfluxDB health check failed: {e}")
return False