"""Notification system for alerts using Signal Messenger."""
import logging
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from urllib.parse import quote
import httpx
from .config import settings
logger = logging.getLogger(__name__)
class NotificationChannel(ABC):
"""Abstract base class for notification channels."""
@abstractmethod
async def send(self, message: str, **kwargs) -> bool:
"""Send a notification message."""
pass
class SignalMessageChannel(NotificationChannel):
"""Signal Messenger notification channel via CallMeBot API."""
def __init__(self, phone_number: str, api_key: str):
self.phone_number = phone_number
self.api_key = api_key
self.api_url = "https://signal.callmebot.com/signal/send.php"
logger.info(f"Initialized Signal notification channel for {phone_number}")
async def send(self, message: str, **kwargs) -> bool:
"""Send a message via Signal Messenger."""
try:
# URL-encode the message
encoded_message = quote(message)
# Build the API request URL
params = {
"phone": self.phone_number,
"apikey": self.api_key,
"text": encoded_message,
}
# Build query string manually to avoid double encoding
query_parts = [f"{key}={value}" for key, value in params.items()]
url = f"{self.api_url}?{'&'.join(query_parts)}"
# Make HTTP request
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url)
if response.status_code == 200:
logger.info(f"Successfully sent Signal message to {self.phone_number}")
return True
else:
logger.error(
f"CallMeBot Signal API returned status {response.status_code}: {response.text}"
)
return False
except Exception as e:
logger.error(f"Failed to send Signal message: {e}")
return False
class ConsoleChannel(NotificationChannel):
"""Console notification channel (for testing)."""
async def send(self, message: str, **kwargs) -> bool:
"""Log message instead of printing (MCP servers can't use stdout)."""
# Log to stderr, never stdout (MCP uses stdout for JSON-RPC)
logger.info(f"Console notification: {message}")
return True
class NotificationManager:
"""Manages multiple notification channels."""
def __init__(self):
self.channels: Dict[str, NotificationChannel] = {}
self._init_channels()
def _init_channels(self):
"""Initialize notification channels based on configuration."""
# Add Signal channel if configured
if settings.signal_phone_number and settings.signal_api_key:
try:
self.channels["signal"] = SignalMessageChannel(
phone_number=settings.signal_phone_number,
api_key=settings.signal_api_key,
)
logger.info(
f"Signal channel initialized for {settings.signal_phone_number}"
)
except Exception as e:
logger.error(f"Failed to initialize Signal channel: {e}")
else:
logger.warning("Signal credentials not found - notifications will only log")
# Add console channel for fallback (logs to stderr, not stdout)
self.channels["console"] = ConsoleChannel()
async def send_notification(
self,
channel: str,
message: str,
alert: Optional[Any] = None,
) -> bool:
"""Send a notification through specified channel."""
if channel not in self.channels:
logger.warning(
f"Channel '{channel}' not found, falling back to console"
)
channel = "console"
notification_channel = self.channels[channel]
try:
success = await notification_channel.send(message, alert=alert)
return success
except Exception as e:
logger.error(f"Error sending notification via {channel}: {e}")
return False
def add_channel(self, name: str, channel: NotificationChannel):
"""Add a custom notification channel."""
self.channels[name] = channel
logger.info(f"Added notification channel: {name}")
def list_channels(self) -> list[str]:
"""List available notification channels."""
return list(self.channels.keys())