Skip to main content
Glama
notification_tasks.py7.32 kB
"""Notification Celery tasks. This module contains background tasks for sending notifications via email and webhooks. """ from typing import Any import httpx from src.tasks.celery_app import celery_app from src.utils.logging_utils import get_logger logger = get_logger(__name__) @celery_app.task(bind=True, name="notification.email") def send_email_notification( self, to: str | list[str], subject: str, body: str, html: bool = False, attachments: list[dict[str, Any]] | None = None, ) -> dict[str, Any]: """Send an email notification. Args: self: Celery task instance. to: Recipient email(s). subject: Email subject. body: Email body. html: Whether body is HTML. attachments: Optional file attachments. Returns: Send result. """ logger.info( "Sending email notification", to=to, subject=subject, ) try: # Note: This is a placeholder implementation # In production, integrate with email service (SMTP, SendGrid, etc.) recipients = [to] if isinstance(to, str) else to # Simulate email sending logger.info( "Email sent successfully", recipients=len(recipients), subject=subject, ) return { "success": True, "recipients": recipients, "subject": subject, } except Exception as e: logger.error( "Email notification failed", error=str(e), ) return { "success": False, "error": str(e), } @celery_app.task(bind=True, name="notification.webhook") def send_webhook_notification( self, url: str, payload: dict[str, Any], headers: dict[str, str] | None = None, method: str = "POST", timeout: int = 30, retries: int = 3, ) -> dict[str, Any]: """Send a webhook notification. Args: self: Celery task instance. url: Webhook URL. payload: JSON payload. headers: Optional HTTP headers. method: HTTP method. timeout: Request timeout in seconds. retries: Number of retries on failure. Returns: Webhook result. """ logger.info( "Sending webhook notification", url=url, method=method, ) try: default_headers = { "Content-Type": "application/json", "User-Agent": "MCP-OPENAPI-DOCX-Webhook/1.0", } if headers: default_headers.update(headers) with httpx.Client(timeout=timeout) as client: if method.upper() == "POST": response = client.post(url, json=payload, headers=default_headers) elif method.upper() == "PUT": response = client.put(url, json=payload, headers=default_headers) else: return { "success": False, "error": f"Unsupported method: {method}", } response.raise_for_status() logger.info( "Webhook sent successfully", url=url, status_code=response.status_code, ) return { "success": True, "url": url, "status_code": response.status_code, } except httpx.HTTPStatusError as e: logger.error( "Webhook failed with status error", url=url, status_code=e.response.status_code, ) # Retry on server errors if e.response.status_code >= 500 and self.request.retries < retries: self.retry(countdown=2**self.request.retries) return { "success": False, "error": f"HTTP {e.response.status_code}", "status_code": e.response.status_code, } except httpx.RequestError as e: logger.error( "Webhook request failed", url=url, error=str(e), ) # Retry on network errors if self.request.retries < retries: self.retry(countdown=2**self.request.retries) return { "success": False, "error": str(e), } @celery_app.task(name="notification.document_event") def notify_document_event( event_type: str, document_id: str, user_id: str, data: dict[str, Any] | None = None, ) -> dict[str, Any]: """Notify about document events. Args: event_type: Type of event (created, updated, deleted, etc.). document_id: Document ID. user_id: User who triggered the event. data: Additional event data. Returns: Notification result. """ logger.info( "Processing document event", event_type=event_type, document_id=document_id, user_id=user_id, ) try: # Build event payload # In production, this would: # 1. Look up subscribed webhooks for this document/user # 2. Send notifications to each subscriber # 3. Log to audit trail logger.info( "Document event processed", event_type=event_type, document_id=document_id, ) return { "success": True, "event_type": event_type, "document_id": document_id, } except Exception as e: logger.error( "Document event notification failed", error=str(e), ) return { "success": False, "error": str(e), } @celery_app.task(name="notification.batch") def send_batch_notifications( notifications: list[dict[str, Any]], ) -> dict[str, Any]: """Send multiple notifications in batch. Args: notifications: List of notification configs. Returns: Batch result. """ logger.info( "Processing batch notifications", count=len(notifications), ) results = [] success_count = 0 failure_count = 0 for notif in notifications: notif_type = notif.get("type") try: if notif_type == "email": result = send_email_notification( to=notif["to"], subject=notif["subject"], body=notif["body"], ) elif notif_type == "webhook": result = send_webhook_notification( url=notif["url"], payload=notif["payload"], ) else: result = {"success": False, "error": f"Unknown type: {notif_type}"} if result.get("success"): success_count += 1 else: failure_count += 1 results.append(result) except Exception as e: failure_count += 1 results.append({"success": False, "error": str(e)}) logger.info( "Batch notifications completed", success=success_count, failures=failure_count, ) return { "success": failure_count == 0, "total": len(notifications), "success_count": success_count, "failure_count": failure_count, "results": results, }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/Fu-Jie/MCP-OPENAPI-DOCX'

If you have feedback or need assistance with the MCP directory API, please join our Discord server