email_sender.py•4 kB
"""Mailjet email sending functionality"""
import httpx
import base64
import json
import re
class EmailSender:
def __init__(self, api_key: str, secret_key: str):
self.api_key = api_key
self.secret_key = secret_key
self.base_url = "https://api.mailjet.com/v3.1"
def _validate_email(self, email: str) -> bool:
"""Basic email format validation"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return re.match(pattern, email) is not None
async def send_single_email(self, email_data: dict):
"""Send a single email via Mailjet API"""
# Validate email addresses
if not self._validate_email(email_data["from_email"]):
raise Exception(f"Invalid sender email format: {email_data['from_email']}")
if not self._validate_email(email_data["to_email"]):
raise Exception(f"Invalid recipient email format: {email_data['to_email']}")
# Prepare Mailjet payload
payload = {
"Messages": [
{
"From": {
"Email": email_data["from_email"],
"Name": email_data.get("from_name", "")
},
"To": [
{
"Email": email_data["to_email"],
"Name": email_data.get("to_name", "")
}
],
"Subject": email_data["subject"],
"TextPart": email_data["text_content"]
}
]
}
# Add HTML content if provided
if email_data.get("html_content"):
payload["Messages"][0]["HTMLPart"] = email_data["html_content"]
# Prepare authentication
auth_string = f"{self.api_key}:{self.secret_key}"
auth_bytes = auth_string.encode('ascii')
auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
headers = {
"Authorization": f"Basic {auth_b64}",
"Content-Type": "application/json"
}
# Send email
try:
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.base_url}/send",
headers=headers,
json=payload,
timeout=30.0
)
if response.status_code == 401:
raise Exception("Invalid Mailjet credentials")
elif response.status_code == 400:
error_detail = response.json().get("ErrorMessage", "Bad request")
raise Exception(f"Mailjet API error: {error_detail}")
elif response.status_code != 200:
raise Exception(f"Mailjet API error ({response.status_code}): {response.text}")
result = response.json()
# Check if email was successfully queued
if result.get("Messages") and len(result["Messages"]) > 0:
message_result = result["Messages"][0]
if message_result.get("Status") == "success":
return {
"success": True,
"message_id": message_result.get("To", [{}])[0].get("MessageID"),
"status": "queued"
}
else:
errors = message_result.get("Errors", [])
error_msg = errors[0].get("ErrorMessage", "Unknown error") if errors else "Unknown error"
raise Exception(f"Email sending failed: {error_msg}")
else:
raise Exception("No message status returned from Mailjet")
except httpx.TimeoutException:
raise Exception("Email sending timed out")
except httpx.RequestError as e:
raise Exception(f"Network error: {str(e)}")