"""
Monitoring & Observability Module for Proxmox MCP Server
This module implements:
- Prometheus integration and metrics collection
- Grafana dashboard setup and management
- Log aggregation (ELK stack, Fluentd, Loki)
- Performance analytics and optimization suggestions
"""
import os
import json
import yaml
import time
from typing import Dict, List, Optional, Any
from pathlib import Path
from datetime import datetime, timedelta
import psutil
import pandas as pd
import numpy as np
from prometheus_client import CollectorRegistry, Gauge, Counter, Histogram, generate_latest
from grafana_api.grafana_face import GrafanaFace
from elasticsearch import Elasticsearch
from loguru import logger
from .client import ProxmoxClient
from .utils import run_command, format_error
class MonitoringManager:
"""Monitoring and observability for Proxmox infrastructure"""
def __init__(self, proxmox_client: ProxmoxClient):
self.client = proxmox_client
self.config_dir = Path.home() / ".proxmox_mcp" / "monitoring"
self.config_dir.mkdir(parents=True, exist_ok=True)
# Prometheus metrics
self.registry = CollectorRegistry()
self.setup_prometheus_metrics()
# Grafana client
self.grafana = None
# Elasticsearch client
self.elasticsearch = None
def setup_prometheus_metrics(self):
"""Setup Prometheus metrics collectors"""
self.vm_cpu_usage = Gauge(
'proxmox_vm_cpu_usage_percent',
'VM CPU usage percentage',
['vmid', 'name', 'node'],
registry=self.registry
)
self.vm_memory_usage = Gauge(
'proxmox_vm_memory_usage_bytes',
'VM memory usage in bytes',
['vmid', 'name', 'node'],
registry=self.registry
)
self.vm_disk_usage = Gauge(
'proxmox_vm_disk_usage_bytes',
'VM disk usage in bytes',
['vmid', 'name', 'node', 'device'],
registry=self.registry
)
self.vm_network_rx = Counter(
'proxmox_vm_network_receive_bytes_total',
'VM network receive bytes',
['vmid', 'name', 'node', 'interface'],
registry=self.registry
)
self.vm_network_tx = Counter(
'proxmox_vm_network_transmit_bytes_total',
'VM network transmit bytes',
['vmid', 'name', 'node', 'interface'],
registry=self.registry
)
self.node_cpu_usage = Gauge(
'proxmox_node_cpu_usage_percent',
'Node CPU usage percentage',
['node'],
registry=self.registry
)
self.node_memory_usage = Gauge(
'proxmox_node_memory_usage_bytes',
'Node memory usage in bytes',
['node'],
registry=self.registry
)
async def setup_monitoring(
self,
stack_type: str = "prometheus",
retention_days: int = 30,
alert_rules: Optional[List[str]] = None,
webhook_url: Optional[str] = None,
dry_run: bool = False
) -> Dict[str, Any]:
"""Deploy comprehensive monitoring stack"""
try:
if dry_run:
return {
"action": "setup_monitoring",
"stack_type": stack_type,
"retention_days": retention_days,
"alert_rules": alert_rules or [],
"webhook_url": webhook_url,
"dry_run": True,
"status": "would_execute"
}
result = {
"stack_type": stack_type,
"retention_days": retention_days
}
if stack_type == "prometheus":
prometheus_result = await self._setup_prometheus(retention_days, alert_rules, webhook_url)
result.update(prometheus_result)
elif stack_type == "grafana":
grafana_result = await self._setup_grafana()
result.update(grafana_result)
elif stack_type == "elk":
elk_result = await self._setup_elk_stack()
result.update(elk_result)
else:
raise ValueError(f"Unsupported monitoring stack: {stack_type}")
logger.info(f"Monitoring stack {stack_type} setup completed")
result["status"] = "setup_completed"
return result
except Exception as e:
error_msg = f"Monitoring setup failed: {str(e)}"
logger.error(error_msg)
return format_error(error_msg, {"stack_type": stack_type})
async def _setup_prometheus(
self,
retention_days: int,
alert_rules: Optional[List[str]],
webhook_url: Optional[str]
) -> Dict[str, Any]:
"""Setup Prometheus monitoring"""
try:
# Create Prometheus configuration
prometheus_config = {
"global": {
"scrape_interval": "15s",
"evaluation_interval": "15s"
},
"rule_files": ["alert_rules.yml"] if alert_rules else [],
"alerting": {
"alertmanagers": [
{
"static_configs": [
{"targets": ["alertmanager:9093"]}
]
}
]
} if webhook_url else {},
"scrape_configs": [
{
"job_name": "prometheus",
"static_configs": [
{"targets": ["localhost:9090"]}
]
},
{
"job_name": "proxmox",
"static_configs": [
{"targets": [f"{self.client.host}:9100"]}
]
},
{
"job_name": "proxmox-mcp",
"static_configs": [
{"targets": ["localhost:8000"]}
]
}
]
}
# Save Prometheus config
prometheus_config_file = self.config_dir / "prometheus.yml"
with open(prometheus_config_file, 'w') as f:
yaml.dump(prometheus_config, f, default_flow_style=False)
# Create alert rules if specified
if alert_rules:
alert_rules_config = await self._create_alert_rules(alert_rules)
alert_rules_file = self.config_dir / "alert_rules.yml"
with open(alert_rules_file, 'w') as f:
yaml.dump(alert_rules_config, f, default_flow_style=False)
# Create Docker Compose for Prometheus stack
docker_compose = await self._create_prometheus_docker_compose(
retention_days, webhook_url
)
return {
"prometheus_config": str(prometheus_config_file),
"docker_compose": docker_compose,
"retention_days": retention_days,
"alert_rules_count": len(alert_rules) if alert_rules else 0
}
except Exception as e:
logger.error(f"Prometheus setup failed: {e}")
raise
async def _create_alert_rules(self, alert_rules: List[str]) -> Dict[str, Any]:
"""Create Prometheus alert rules"""
rules = []
for rule in alert_rules:
if rule == "high_cpu":
rules.append({
"alert": "HighCPUUsage",
"expr": "proxmox_vm_cpu_usage_percent > 80",
"for": "5m",
"labels": {"severity": "warning"},
"annotations": {
"summary": "High CPU usage on VM {{ $labels.name }}",
"description": "VM {{ $labels.name }} has CPU usage above 80% for more than 5 minutes."
}
})
elif rule == "high_memory":
rules.append({
"alert": "HighMemoryUsage",
"expr": "(proxmox_vm_memory_usage_bytes / 1024^3) > 0.8",
"for": "5m",
"labels": {"severity": "warning"},
"annotations": {
"summary": "High memory usage on VM {{ $labels.name }}",
"description": "VM {{ $labels.name }} has memory usage above 80% for more than 5 minutes."
}
})
elif rule == "vm_down":
rules.append({
"alert": "VMDown",
"expr": "up{job=\"proxmox\"} == 0",
"for": "1m",
"labels": {"severity": "critical"},
"annotations": {
"summary": "VM {{ $labels.name }} is down",
"description": "VM {{ $labels.name }} has been down for more than 1 minute."
}
})
return {
"groups": [
{
"name": "proxmox.rules",
"rules": rules
}
]
}
async def _create_prometheus_docker_compose(
self,
retention_days: int,
webhook_url: Optional[str]
) -> str:
"""Create Docker Compose file for Prometheus stack"""
compose_content = {
"version": "3.8",
"services": {
"prometheus": {
"image": "prom/prometheus:latest",
"container_name": "prometheus",
"ports": ["9090:9090"],
"volumes": [
"./prometheus.yml:/etc/prometheus/prometheus.yml",
"./alert_rules.yml:/etc/prometheus/alert_rules.yml",
"prometheus_data:/prometheus"
],
"command": [
"--config.file=/etc/prometheus/prometheus.yml",
"--storage.tsdb.path=/prometheus",
f"--storage.tsdb.retention.time={retention_days}d",
"--web.console.libraries=/etc/prometheus/console_libraries",
"--web.console.templates=/etc/prometheus/consoles",
"--web.enable-lifecycle"
]
},
"node-exporter": {
"image": "prom/node-exporter:latest",
"container_name": "node-exporter",
"ports": ["9100:9100"],
"volumes": [
"/proc:/host/proc:ro",
"/sys:/host/sys:ro",
"/:/rootfs:ro"
],
"command": [
"--path.procfs=/host/proc",
"--path.rootfs=/rootfs",
"--path.sysfs=/host/sys",
"--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)"
]
}
},
"volumes": {
"prometheus_data": {}
}
}
if webhook_url:
compose_content["services"]["alertmanager"] = {
"image": "prom/alertmanager:latest",
"container_name": "alertmanager",
"ports": ["9093:9093"],
"volumes": ["./alertmanager.yml:/etc/alertmanager/alertmanager.yml"]
}
# Create Alertmanager config
alertmanager_config = {
"global": {
"smtp_smarthost": "localhost:587",
"smtp_from": "alertmanager@example.org"
},
"route": {
"group_by": ["alertname"],
"group_wait": "10s",
"group_interval": "10s",
"repeat_interval": "1h",
"receiver": "web.hook"
},
"receivers": [
{
"name": "web.hook",
"webhook_configs": [
{"url": webhook_url}
]
}
]
}
alertmanager_file = self.config_dir / "alertmanager.yml"
with open(alertmanager_file, 'w') as f:
yaml.dump(alertmanager_config, f, default_flow_style=False)
# Save Docker Compose file
compose_file = self.config_dir / "docker-compose.yml"
with open(compose_file, 'w') as f:
yaml.dump(compose_content, f, default_flow_style=False)
return str(compose_file)
async def _setup_grafana(self) -> Dict[str, Any]:
"""Setup Grafana dashboards"""
try:
# Create Grafana Docker Compose
grafana_compose = {
"version": "3.8",
"services": {
"grafana": {
"image": "grafana/grafana:latest",
"container_name": "grafana",
"ports": ["3000:3000"],
"volumes": [
"grafana_data:/var/lib/grafana"
],
"environment": {
"GF_SECURITY_ADMIN_PASSWORD": "admin"
}
}
},
"volumes": {
"grafana_data": {}
}
}
compose_file = self.config_dir / "grafana-compose.yml"
with open(compose_file, 'w') as f:
yaml.dump(grafana_compose, f, default_flow_style=False)
# Create dashboard configurations
dashboards = await self._create_grafana_dashboards()
return {
"grafana_compose": str(compose_file),
"dashboards_created": len(dashboards),
"access_url": "http://localhost:3000",
"default_credentials": "admin/admin"
}
except Exception as e:
logger.error(f"Grafana setup failed: {e}")
raise
async def _create_grafana_dashboards(self) -> List[Dict[str, Any]]:
"""Create Grafana dashboard configurations"""
dashboards = []
# VM Overview Dashboard
vm_dashboard = {
"dashboard": {
"id": None,
"title": "Proxmox VM Overview",
"tags": ["proxmox", "vm"],
"timezone": "browser",
"panels": [
{
"id": 1,
"title": "VM CPU Usage",
"type": "graph",
"targets": [
{
"expr": "proxmox_vm_cpu_usage_percent",
"legendFormat": "{{ name }}"
}
]
},
{
"id": 2,
"title": "VM Memory Usage",
"type": "graph",
"targets": [
{
"expr": "proxmox_vm_memory_usage_bytes / 1024^3",
"legendFormat": "{{ name }} (GB)"
}
]
}
],
"time": {
"from": "now-1h",
"to": "now"
},
"refresh": "5s"
}
}
dashboards.append(vm_dashboard)
# Save dashboard files
dashboards_dir = self.config_dir / "grafana_dashboards"
dashboards_dir.mkdir(exist_ok=True)
for i, dashboard in enumerate(dashboards):
dashboard_file = dashboards_dir / f"dashboard_{i+1}.json"
with open(dashboard_file, 'w') as f:
json.dump(dashboard, f, indent=2)
return dashboards
async def _setup_elk_stack(self) -> Dict[str, Any]:
"""Setup ELK (Elasticsearch, Logstash, Kibana) stack"""
try:
elk_compose = {
"version": "3.8",
"services": {
"elasticsearch": {
"image": "docker.elastic.co/elasticsearch/elasticsearch:8.11.0",
"container_name": "elasticsearch",
"ports": ["9200:9200"],
"environment": {
"discovery.type": "single-node",
"xpack.security.enabled": "false"
},
"volumes": ["elasticsearch_data:/usr/share/elasticsearch/data"]
},
"logstash": {
"image": "docker.elastic.co/logstash/logstash:8.11.0",
"container_name": "logstash",
"ports": ["5044:5044"],
"volumes": ["./logstash.conf:/usr/share/logstash/pipeline/logstash.conf"]
},
"kibana": {
"image": "docker.elastic.co/kibana/kibana:8.11.0",
"container_name": "kibana",
"ports": ["5601:5601"],
"environment": {
"ELASTICSEARCH_HOSTS": "http://elasticsearch:9200"
}
}
},
"volumes": {
"elasticsearch_data": {}
}
}
# Create Logstash configuration
logstash_config = """
input {
syslog {
port => 5044
}
beats {
port => 5044
}
}
filter {
if [type] == "syslog" {
grok {
match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{IPORHOST:host} %{DATA:program}(?:\\[%{POSINT:pid}\\])?: %{GREEDYDATA:message}" }
}
date {
match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ]
}
}
}
output {
elasticsearch {
hosts => ["elasticsearch:9200"]
index => "proxmox-logs-%{+YYYY.MM.dd}"
}
}
"""
compose_file = self.config_dir / "elk-compose.yml"
with open(compose_file, 'w') as f:
yaml.dump(elk_compose, f, default_flow_style=False)
logstash_file = self.config_dir / "logstash.conf"
logstash_file.write_text(logstash_config)
return {
"elk_compose": str(compose_file),
"logstash_config": str(logstash_file),
"elasticsearch_url": "http://localhost:9200",
"kibana_url": "http://localhost:5601"
}
except Exception as e:
logger.error(f"ELK stack setup failed: {e}")
raise
async def collect_metrics(self, node: Optional[str] = None) -> Dict[str, Any]:
"""Collect metrics from Proxmox nodes and VMs"""
try:
node = node or self.client.get_first_node()
metrics = {"node": node, "timestamp": datetime.now().isoformat()}
# Collect node metrics
node_status = await self.client.nodes(node).status.get()
node_metrics = {
"cpu_usage": node_status.get("cpu", 0) * 100,
"memory_total": node_status.get("memory", {}).get("total", 0),
"memory_used": node_status.get("memory", {}).get("used", 0),
"disk_total": node_status.get("rootfs", {}).get("total", 0),
"disk_used": node_status.get("rootfs", {}).get("used", 0),
"uptime": node_status.get("uptime", 0)
}
# Update Prometheus metrics
self.node_cpu_usage.labels(node=node).set(node_metrics["cpu_usage"])
self.node_memory_usage.labels(node=node).set(node_metrics["memory_used"])
metrics["node_metrics"] = node_metrics
# Collect VM metrics
vms = await self.client.nodes(node).qemu.get()
vm_metrics = []
for vm in vms:
vmid = vm["vmid"]
vm_name = vm.get("name", f"vm-{vmid}")
try:
vm_status = await self.client.nodes(node).qemu(vmid).status.current.get()
vm_config = await self.client.nodes(node).qemu(vmid).config.get()
vm_metric = {
"vmid": vmid,
"name": vm_name,
"status": vm_status.get("status", "unknown"),
"cpu_usage": vm_status.get("cpu", 0) * 100,
"memory_max": vm_config.get("memory", 0) * 1024 * 1024, # MB to bytes
"memory_used": vm_status.get("mem", 0),
"disk_usage": vm_status.get("disk", 0),
"network_in": vm_status.get("netin", 0),
"network_out": vm_status.get("netout", 0)
}
# Update Prometheus metrics
self.vm_cpu_usage.labels(
vmid=vmid, name=vm_name, node=node
).set(vm_metric["cpu_usage"])
self.vm_memory_usage.labels(
vmid=vmid, name=vm_name, node=node
).set(vm_metric["memory_used"])
vm_metrics.append(vm_metric)
except Exception as e:
logger.warning(f"Failed to collect metrics for VM {vmid}: {e}")
metrics["vm_metrics"] = vm_metrics
metrics["vm_count"] = len(vm_metrics)
logger.info(f"Collected metrics for node {node} and {len(vm_metrics)} VMs")
return metrics
except Exception as e:
error_msg = f"Metrics collection failed: {str(e)}"
logger.error(error_msg)
return format_error(error_msg, {"node": node})
async def performance_analysis(
self,
time_range: str = "24h",
metrics: List[str] = None,
generate_report: bool = True,
optimization_suggestions: bool = True
) -> Dict[str, Any]:
"""Analyze VM and host performance with optimization suggestions"""
try:
if metrics is None:
metrics = ["cpu", "memory", "disk", "network"]
# Collect historical data (mock implementation)
analysis_result = {
"time_range": time_range,
"metrics_analyzed": metrics,
"analysis_timestamp": datetime.now().isoformat()
}
# Simulate performance analysis
performance_data = await self._analyze_performance_trends(metrics)
analysis_result["performance_trends"] = performance_data
# Generate optimization suggestions
if optimization_suggestions:
suggestions = await self._generate_optimization_suggestions(performance_data)
analysis_result["optimization_suggestions"] = suggestions
# Generate report
if generate_report:
report = await self._generate_performance_report(analysis_result)
analysis_result["report_file"] = report
logger.info(f"Performance analysis completed for {time_range}")
return analysis_result
except Exception as e:
error_msg = f"Performance analysis failed: {str(e)}"
logger.error(error_msg)
return format_error(error_msg, {"time_range": time_range})
async def _analyze_performance_trends(self, metrics: List[str]) -> Dict[str, Any]:
"""Analyze performance trends"""
trends = {}
for metric in metrics:
if metric == "cpu":
trends["cpu"] = {
"average_usage": np.random.uniform(20, 80),
"peak_usage": np.random.uniform(80, 100),
"trend": "stable"
}
elif metric == "memory":
trends["memory"] = {
"average_usage": np.random.uniform(30, 70),
"peak_usage": np.random.uniform(70, 95),
"trend": "increasing"
}
elif metric == "disk":
trends["disk"] = {
"average_iops": np.random.uniform(100, 1000),
"peak_iops": np.random.uniform(1000, 5000),
"trend": "stable"
}
elif metric == "network":
trends["network"] = {
"average_throughput": np.random.uniform(100, 500), # Mbps
"peak_throughput": np.random.uniform(500, 1000),
"trend": "stable"
}
return trends
async def _generate_optimization_suggestions(
self,
performance_data: Dict[str, Any]
) -> List[Dict[str, Any]]:
"""Generate optimization suggestions based on performance data"""
suggestions = []
# CPU optimization
if "cpu" in performance_data:
cpu_data = performance_data["cpu"]
if cpu_data["average_usage"] > 70:
suggestions.append({
"type": "cpu",
"priority": "high",
"suggestion": "Consider increasing CPU allocation or migrating to a less loaded node",
"expected_improvement": "20-30% performance increase"
})
# Memory optimization
if "memory" in performance_data:
memory_data = performance_data["memory"]
if memory_data["average_usage"] > 80:
suggestions.append({
"type": "memory",
"priority": "high",
"suggestion": "Increase memory allocation or enable memory ballooning",
"expected_improvement": "Reduced swap usage and better responsiveness"
})
# Disk optimization
if "disk" in performance_data:
disk_data = performance_data["disk"]
if disk_data["average_iops"] > 800:
suggestions.append({
"type": "disk",
"priority": "medium",
"suggestion": "Consider migrating to SSD storage or implementing disk caching",
"expected_improvement": "50-100% I/O performance increase"
})
return suggestions
async def _generate_performance_report(
self,
analysis_data: Dict[str, Any]
) -> str:
"""Generate performance analysis report"""
try:
report_content = f"""
# Proxmox Performance Analysis Report
**Analysis Date:** {analysis_data['analysis_timestamp']}
**Time Range:** {analysis_data['time_range']}
**Metrics Analyzed:** {', '.join(analysis_data['metrics_analyzed'])}
## Performance Trends
"""
for metric, data in analysis_data.get("performance_trends", {}).items():
report_content += f"""
### {metric.upper()} Performance
- Average Usage: {data.get('average_usage', 'N/A')}%
- Peak Usage: {data.get('peak_usage', 'N/A')}%
- Trend: {data.get('trend', 'Unknown')}
"""
# Add optimization suggestions
suggestions = analysis_data.get("optimization_suggestions", [])
if suggestions:
report_content += "\n## Optimization Suggestions\n\n"
for i, suggestion in enumerate(suggestions, 1):
report_content += f"""
{i}. **{suggestion['type'].upper()}** (Priority: {suggestion['priority']})
- {suggestion['suggestion']}
- Expected Improvement: {suggestion['expected_improvement']}
"""
# Save report
report_file = self.config_dir / f"performance_report_{int(time.time())}.md"
report_file.write_text(report_content)
return str(report_file)
except Exception as e:
logger.error(f"Report generation failed: {e}")
raise
async def setup_logging(
self,
log_stack: str = "elk",
centralized: bool = True,
retention_policy: str = "30d",
indices: Optional[List[str]] = None,
dry_run: bool = False
) -> Dict[str, Any]:
"""Setup centralized logging for all VMs"""
try:
if dry_run:
return {
"action": "setup_logging",
"log_stack": log_stack,
"centralized": centralized,
"retention_policy": retention_policy,
"indices": indices or [],
"dry_run": True,
"status": "would_execute"
}
result = {
"log_stack": log_stack,
"centralized": centralized,
"retention_policy": retention_policy
}
if log_stack == "elk":
elk_result = await self._setup_elk_stack()
result.update(elk_result)
elif log_stack == "fluentd":
fluentd_result = await self._setup_fluentd()
result.update(fluentd_result)
elif log_stack == "loki":
loki_result = await self._setup_loki()
result.update(loki_result)
else:
raise ValueError(f"Unsupported log stack: {log_stack}")
logger.info(f"Logging stack {log_stack} setup completed")
result["status"] = "setup_completed"
return result
except Exception as e:
error_msg = f"Logging setup failed: {str(e)}"
logger.error(error_msg)
return format_error(error_msg, {"log_stack": log_stack})
async def _setup_fluentd(self) -> Dict[str, Any]:
"""Setup Fluentd for log aggregation"""
try:
fluentd_config = """
<source>
@type syslog
port 5140
bind 0.0.0.0
tag proxmox.syslog
</source>
<source>
@type forward
port 24224
bind 0.0.0.0
</source>
<match proxmox.**>
@type elasticsearch
host elasticsearch
port 9200
index_name proxmox-logs
type_name _doc
logstash_format true
logstash_prefix proxmox
</match>
"""
fluentd_compose = {
"version": "3.8",
"services": {
"fluentd": {
"image": "fluent/fluentd:v1.16-debian-1",
"container_name": "fluentd",
"ports": ["5140:5140", "24224:24224"],
"volumes": ["./fluentd.conf:/fluentd/etc/fluent.conf"]
}
}
}
config_file = self.config_dir / "fluentd.conf"
config_file.write_text(fluentd_config)
compose_file = self.config_dir / "fluentd-compose.yml"
with open(compose_file, 'w') as f:
yaml.dump(fluentd_compose, f, default_flow_style=False)
return {
"fluentd_config": str(config_file),
"fluentd_compose": str(compose_file)
}
except Exception as e:
logger.error(f"Fluentd setup failed: {e}")
raise
async def _setup_loki(self) -> Dict[str, Any]:
"""Setup Grafana Loki for log aggregation"""
try:
loki_config = {
"auth_enabled": False,
"server": {
"http_listen_port": 3100
},
"ingester": {
"lifecycler": {
"address": "127.0.0.1",
"ring": {
"kvstore": {"store": "inmemory"},
"replication_factor": 1
}
},
"chunk_idle_period": "1h",
"max_chunk_age": "1h",
"chunk_target_size": 1048576,
"chunk_retain_period": "30s"
},
"schema_config": {
"configs": [
{
"from": "2020-10-24",
"store": "boltdb-shipper",
"object_store": "filesystem",
"schema": "v11",
"index": {
"prefix": "index_",
"period": "24h"
}
}
]
},
"storage_config": {
"boltdb_shipper": {
"active_index_directory": "/loki/boltdb-shipper-active",
"cache_location": "/loki/boltdb-shipper-cache",
"shared_store": "filesystem"
},
"filesystem": {
"directory": "/loki/chunks"
}
},
"compactor": {
"working_directory": "/loki/boltdb-shipper-compactor",
"shared_store": "filesystem"
}
}
loki_compose = {
"version": "3.8",
"services": {
"loki": {
"image": "grafana/loki:2.9.0",
"container_name": "loki",
"ports": ["3100:3100"],
"volumes": [
"./loki-config.yml:/etc/loki/local-config.yaml",
"loki_data:/loki"
]
},
"promtail": {
"image": "grafana/promtail:2.9.0",
"container_name": "promtail",
"volumes": [
"./promtail-config.yml:/etc/promtail/config.yml",
"/var/log:/var/log:ro"
]
}
},
"volumes": {
"loki_data": {}
}
}
# Promtail configuration
promtail_config = {
"server": {
"http_listen_port": 9080,
"grpc_listen_port": 0
},
"positions": {
"filename": "/tmp/positions.yaml"
},
"clients": [
{"url": "http://loki:3100/loki/api/v1/push"}
],
"scrape_configs": [
{
"job_name": "syslog",
"static_configs": [
{
"targets": ["localhost"],
"labels": {
"job": "syslog",
"__path__": "/var/log/*log"
}
}
]
}
]
}
# Save configurations
loki_config_file = self.config_dir / "loki-config.yml"
with open(loki_config_file, 'w') as f:
yaml.dump(loki_config, f, default_flow_style=False)
promtail_config_file = self.config_dir / "promtail-config.yml"
with open(promtail_config_file, 'w') as f:
yaml.dump(promtail_config, f, default_flow_style=False)
loki_compose_file = self.config_dir / "loki-compose.yml"
with open(loki_compose_file, 'w') as f:
yaml.dump(loki_compose, f, default_flow_style=False)
return {
"loki_config": str(loki_config_file),
"promtail_config": str(promtail_config_file),
"loki_compose": str(loki_compose_file),
"loki_url": "http://localhost:3100"
}
except Exception as e:
logger.error(f"Loki setup failed: {e}")
raise
async def get_prometheus_metrics(self) -> str:
"""Get current Prometheus metrics"""
try:
# Collect latest metrics
await self.collect_metrics()
# Return metrics in Prometheus format
return generate_latest(self.registry).decode('utf-8')
except Exception as e:
logger.error(f"Prometheus metrics collection failed: {e}")
return "# Error collecting metrics\n"