"""Notification system for alerts."""
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from urllib.parse import quote
import httpx
from telegram import Bot
from telegram.error import TelegramError
from .config import settings
logger = logging.getLogger(__name__)
class NotificationChannel(ABC):
"""Abstract base class for notification channels."""
@abstractmethod
async def send(self, message: str, **kwargs) -> bool:
"""Send a notification message."""
pass
class TelegramChannel(NotificationChannel):
"""Telegram notification channel."""
def __init__(self, bot_token: str, chat_id: str):
self.bot = Bot(token=bot_token)
self.chat_id = chat_id
logger.info("Initialized Telegram notification channel")
async def send(self, message: str, **kwargs) -> bool:
"""Send a message via Telegram."""
try:
await self.bot.send_message(
chat_id=self.chat_id,
text=message,
parse_mode="HTML",
disable_web_page_preview=True,
)
logger.info(f"Sent Telegram notification to {self.chat_id}")
return True
except TelegramError as e:
logger.error(f"Failed to send Telegram notification: {e}")
return False
class TelegramCallChannel(NotificationChannel):
"""Telegram call notification channel using CallMeBot API."""
def __init__(
self,
username: str,
default_lang: str = "en-US-Standard-B",
repeat_count: int = 1,
):
self.username = username
self.default_lang = default_lang
self.repeat_count = repeat_count
self.api_url = "http://api.callmebot.com/start.php"
logger.info(f"Initialized Telegram call notification channel for {username}")
async def send(self, message: str, **kwargs) -> bool:
"""Initiate a Telegram call with text-to-speech."""
try:
# Truncate message to 256 characters (CallMeBot limit)
if len(message) > 256:
message = message[:253] + "..."
logger.warning("Message truncated to 256 characters for TTS")
# URL-encode the message
encoded_message = quote(message)
# Build the API request URL
params = {
"user": self.username,
"text": encoded_message,
"lang": kwargs.get("lang", self.default_lang),
"rpt": kwargs.get("rpt", self.repeat_count),
}
# Build query string manually to avoid double encoding
query_parts = [f"{key}={value}" for key, value in params.items()]
url = f"{self.api_url}?{'&'.join(query_parts)}"
# Make HTTP request
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
if response.status_code == 200:
logger.info(f"Successfully initiated call to {self.username}")
return True
else:
logger.error(
f"CallMeBot API returned status {response.status_code}: {response.text}"
)
return False
except Exception as e:
logger.error(f"Failed to initiate Telegram call: {e}")
return False
class ConsoleChannel(NotificationChannel):
"""Console notification channel (for testing)."""
async def send(self, message: str, **kwargs) -> bool:
"""Log message instead of printing (MCP servers can't use stdout)."""
import sys
# Log to stderr, never stdout (MCP uses stdout for JSON-RPC)
logger.info(f"Console notification: {message}")
return True
class NotificationManager:
"""Manages multiple notification channels."""
def __init__(self):
self.channels: Dict[str, NotificationChannel] = {}
self._init_channels()
def _init_channels(self):
"""Initialize notification channels based on configuration."""
# Add Telegram channel if configured
if settings.telegram_bot_token and settings.telegram_chat_id:
try:
self.channels["telegram"] = TelegramChannel(
bot_token=settings.telegram_bot_token,
chat_id=settings.telegram_chat_id,
)
logger.info("Telegram channel initialized and ready")
except Exception as e:
logger.error(f"Failed to initialize Telegram channel: {e}")
else:
logger.warning("Telegram credentials not found - notifications will only log")
# Add Telegram call channel if configured
if settings.callmebot_username:
try:
self.channels["telegram_call"] = TelegramCallChannel(
username=settings.callmebot_username,
default_lang=settings.callmebot_default_lang,
repeat_count=settings.callmebot_repeat_count,
)
logger.info(
f"Telegram call channel initialized for {settings.callmebot_username}"
)
except Exception as e:
logger.error(f"Failed to initialize Telegram call channel: {e}")
else:
logger.info(
"CallMeBot username not configured - call notifications not available"
)
# Add console channel for fallback (logs to stderr, not stdout)
self.channels["console"] = ConsoleChannel()
async def send_notification(
self,
channel: str,
message: str,
alert: Optional[Any] = None,
) -> bool:
"""Send a notification through specified channel."""
if channel not in self.channels:
logger.warning(
f"Channel '{channel}' not found, falling back to console"
)
channel = "console"
notification_channel = self.channels[channel]
try:
success = await notification_channel.send(message, alert=alert)
return success
except Exception as e:
logger.error(f"Error sending notification via {channel}: {e}")
return False
def add_channel(self, name: str, channel: NotificationChannel):
"""Add a custom notification channel."""
self.channels[name] = channel
logger.info(f"Added notification channel: {name}")
def list_channels(self) -> list[str]:
"""List available notification channels."""
return list(self.channels.keys())