"""
Zabbix API 연결 및 호출을 관리하는 모듈
"""
import logging
import os
import requests
from datetime import datetime, timedelta
from typing import Any, Dict, List
logger = logging.getLogger(__name__)
class ZabbixAPI:
"""Zabbix API 연결 및 호출을 관리하는 클래스"""
def __init__(self):
self.url = os.getenv("ZABBIX_URL")
self.api_token = os.getenv("ZABBIX_API_TOKEN")
self.username = os.getenv("ZABBIX_USER")
self.password = os.getenv("ZABBIX_PASSWORD")
# SSL 검증 및 타임아웃은 환경 변수로 제어 가능
verify_env = os.getenv("ZABBIX_VERIFY_SSL", "true").strip().lower()
self.verify_ssl = verify_env in ("1", "true", "yes", "y")
try:
self.timeout = int(os.getenv("ZABBIX_TIMEOUT", "30").strip())
except ValueError:
self.timeout = 30
self.auth_token = None
self.session = requests.Session()
if not self.url:
raise ValueError("ZABBIX_URL 환경 변수가 설정되지 않았습니다.")
def _make_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Zabbix API 요청을 수행합니다."""
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": 1
}
# API 토큰 또는 인증 토큰 추가
if self.api_token:
payload["auth"] = self.api_token
elif self.auth_token:
payload["auth"] = self.auth_token
try:
response = self.session.post(
self.url,
json=payload,
headers={"Content-Type": "application/json"},
timeout=self.timeout,
verify=self.verify_ssl,
)
response.raise_for_status()
result = response.json()
if "error" in result:
err = result["error"]
error_msg = err.get("message", "알 수 없는 오류")
error_data = err.get("data")
error_code = err.get("code")
detail = f" (code={error_code})" if error_code is not None else ""
data_part = f": {error_data}" if error_data else ""
logger.error(f"Zabbix API 오류{detail}: {error_msg}{data_part}")
raise Exception(f"Zabbix API 오류{detail}: {error_msg}{data_part}")
return result.get("result", {})
except requests.exceptions.RequestException as e:
logger.error(f"HTTP 요청 오류: {e}")
raise Exception(f"Zabbix 서버 연결 실패: {e}")
def authenticate(self) -> bool:
"""사용자명/비밀번호로 인증을 수행합니다."""
if self.api_token:
return True # API 토큰이 있으면 별도 인증 불필요
if not self.username or not self.password:
raise ValueError("API 토큰 또는 사용자명/비밀번호가 설정되지 않았습니다.")
try:
result = self._make_request("user.login", {
"user": self.username,
"password": self.password
})
self.auth_token = result
logger.info("Zabbix 인증 성공")
return True
except Exception as e:
logger.error(f"Zabbix 인증 실패: {e}")
return False
def api_info_version(self) -> str:
"""Zabbix API 버전을 반환합니다."""
try:
result = self._make_request("apiinfo.version", {})
# Zabbix는 문자열 버전을 반환
return str(result)
except Exception as e:
logger.error(f"API 버전 조회 실패: {e}")
raise
def get_problems(self, recent_minutes: int = 60, limit: int = 100) -> List[Dict[str, Any]]:
"""현재 발생 중인 문제 목록을 조회합니다."""
# 최근 N분 이내의 타임스탬프 계산
time_from = int((datetime.now() - timedelta(minutes=recent_minutes)).timestamp())
# 일부 Zabbix 버전에서 recent/suppressed 조합이 Invalid params를 유발할 수 있어 보수적으로 제거
params = {
"output": ["eventid", "clock", "name", "severity", "acknowledged"],
"selectHosts": ["hostid", "host", "name"],
"time_from": time_from,
"sortfield": ["clock"],
"sortorder": "DESC",
"limit": limit
}
try:
problems = self._make_request("problem.get", params)
# 결과 포맷팅
formatted_problems = []
for problem in problems:
# 호스트 정보 추출 (Visible Name 우선, 없으면 HostName)
hosts = [
self._host_display_name(host)
for host in problem.get("hosts", [])
if isinstance(host, dict)
]
formatted_problem = {
"eventid": problem["eventid"],
"clock": int(problem["clock"]),
"timestamp": datetime.fromtimestamp(int(problem["clock"])).isoformat(),
"name": problem["name"],
"severity": int(problem["severity"]),
"severity_name": self._get_severity_name(int(problem["severity"])),
"hosts": hosts,
# acknowledged는 문자열/정수 모두 수용
"acknowledged": bool(int(problem.get("acknowledged", 0))) if str(problem.get("acknowledged", "0")).isdigit() else bool(problem.get("acknowledged", False))
}
formatted_problems.append(formatted_problem)
return formatted_problems
except Exception as e:
logger.error(f"문제 조회 실패: {e}")
raise
def get_triggers(self, min_severity: int = 0, limit: int = 100) -> List[Dict[str, Any]]:
"""현재 활성화된 트리거 목록을 조회합니다."""
params = {
"output": ["triggerid", "description", "priority", "lastchange", "value"],
"selectHosts": ["hostid", "host", "name"],
"filter": {
"value": 1 # 문제 상태인 트리거만
},
"min_severity": min_severity,
"sortfield": ["priority", "lastchange"],
"sortorder": "DESC",
"limit": limit,
"monitored": True, # 모니터링 중인 트리거만
"active": True # 활성화된 트리거만
}
try:
triggers = self._make_request("trigger.get", params)
# 결과 포맷팅
formatted_triggers = []
for trigger in triggers:
# 호스트 정보 추출 (Visible Name 우선, 없으면 HostName)
hosts = [
self._host_display_name(host)
for host in trigger.get("hosts", [])
if isinstance(host, dict)
]
formatted_trigger = {
"triggerid": trigger["triggerid"],
"description": trigger["description"],
"priority": int(trigger["priority"]),
"priority_name": self._get_severity_name(int(trigger["priority"])),
"hosts": hosts,
"lastchange": int(trigger["lastchange"]),
"lastchange_timestamp": datetime.fromtimestamp(int(trigger["lastchange"])).isoformat(),
"value": int(trigger["value"])
}
formatted_triggers.append(formatted_trigger)
return formatted_triggers
except Exception as e:
logger.error(f"트리거 조회 실패: {e}")
raise
def _get_severity_name(self, severity: int) -> str:
"""심각도 숫자를 이름으로 변환합니다."""
severity_names = {
0: "Not classified",
1: "Information",
2: "Warning",
3: "Average",
4: "High",
5: "Disaster"
}
return severity_names.get(severity, "Unknown")
def _host_display_name(self, host: Dict[str, Any]) -> str:
"""호스트 표기: Visible Name(name) 우선, 없으면 HostName(host)."""
name = (host.get("name") or "").strip()
if name:
return name
return (host.get("host") or "").strip() or str(host.get("hostid", ""))