"""Notification system for alerts."""
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from telegram import Bot
from telegram.error import TelegramError
from .config import settings
logger = logging.getLogger(__name__)
class NotificationChannel(ABC):
"""Abstract base class for notification channels."""
@abstractmethod
async def send(self, message: str, **kwargs) -> bool:
"""Send a notification message."""
pass
class TelegramChannel(NotificationChannel):
"""Telegram notification channel."""
def __init__(self, bot_token: str, chat_id: str):
self.bot = Bot(token=bot_token)
self.chat_id = chat_id
logger.info("Initialized Telegram notification channel")
async def send(self, message: str, **kwargs) -> bool:
"""Send a message via Telegram."""
try:
await self.bot.send_message(
chat_id=self.chat_id,
text=message,
parse_mode="HTML",
disable_web_page_preview=True,
)
logger.info(f"Sent Telegram notification to {self.chat_id}")
return True
except TelegramError as e:
logger.error(f"Failed to send Telegram notification: {e}")
return False
class ConsoleChannel(NotificationChannel):
"""Console notification channel (for testing)."""
async def send(self, message: str, **kwargs) -> bool:
"""Log message instead of printing (MCP servers can't use stdout)."""
import sys
# Log to stderr, never stdout (MCP uses stdout for JSON-RPC)
logger.info(f"Console notification: {message}")
return True
class NotificationManager:
"""Manages multiple notification channels."""
def __init__(self):
self.channels: Dict[str, NotificationChannel] = {}
self._init_channels()
def _init_channels(self):
"""Initialize notification channels based on configuration."""
# Add Telegram channel if configured
if settings.telegram_bot_token and settings.telegram_chat_id:
try:
self.channels["telegram"] = TelegramChannel(
bot_token=settings.telegram_bot_token,
chat_id=settings.telegram_chat_id,
)
logger.info("Telegram channel initialized and ready")
except Exception as e:
logger.error(f"Failed to initialize Telegram channel: {e}")
else:
logger.warning("Telegram credentials not found - notifications will only log")
# Add console channel for fallback (logs to stderr, not stdout)
self.channels["console"] = ConsoleChannel()
async def send_notification(
self,
channel: str,
message: str,
alert: Optional[Any] = None,
) -> bool:
"""Send a notification through specified channel."""
if channel not in self.channels:
logger.warning(
f"Channel '{channel}' not found, falling back to console"
)
channel = "console"
notification_channel = self.channels[channel]
try:
success = await notification_channel.send(message, alert=alert)
return success
except Exception as e:
logger.error(f"Error sending notification via {channel}: {e}")
return False
def add_channel(self, name: str, channel: NotificationChannel):
"""Add a custom notification channel."""
self.channels[name] = channel
logger.info(f"Added notification channel: {name}")
def list_channels(self) -> list[str]:
"""List available notification channels."""
return list(self.channels.keys())