webhook_template.j2โข13.7 kB
{# Jinja2 template for webhook handling functionality #}
import asyncio
import json
import logging
import time
from typing import Dict, List, Optional, Any, Callable
import httpx
from fastapi import BackgroundTasks
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("webhook_handler")
# Store for registered webhooks
registered_webhooks: List[Dict[str, Any]] = []
# Store for webhook delivery history
webhook_history: List[Dict[str, Any]] = []
async def send_webhook(
url: str,
payload: Dict[str, Any],
headers: Optional[Dict[str, str]] = None,
max_retries: int = 3,
retry_delay: int = 2
) -> Dict[str, Any]:
"""
Send a webhook notification to the specified URL.
Args:
url: The webhook endpoint URL
payload: The data to send as JSON
headers: Optional HTTP headers for the request
max_retries: Maximum number of retry attempts
retry_delay: Delay between retries in seconds
Returns:
Dict containing the status of the webhook delivery
"""
default_headers = {
"Content-Type": "application/json",
"User-Agent": "MockLoop-Webhook/1.0"
}
if headers:
default_headers.update(headers)
# Create delivery ID and record for history
delivery_id = f"wh_{int(time.time())}_{len(webhook_history) + 1}"
delivery_record = {
"id": delivery_id,
"url": url,
"payload": payload,
"headers": default_headers,
"timestamp": time.time(),
"status": "pending",
"attempts": 0,
"response": None
}
webhook_history.append(delivery_record)
# Perform the actual webhook delivery with retries
attempt = 0
while attempt < max_retries:
try:
attempt += 1
delivery_record["attempts"] = attempt
logger.info(f"Sending webhook {delivery_id} to {url} (attempt {attempt}/{max_retries})")
async with httpx.AsyncClient(timeout=10.0) as client:
start_time = time.time()
response = await client.post(
url,
json=payload,
headers=default_headers
)
elapsed_ms = int((time.time() - start_time) * 1000)
delivery_record["response"] = {
"status_code": response.status_code,
"headers": dict(response.headers),
"body": response.text[:1000], # Truncate response if too large
"elapsed_ms": elapsed_ms
}
if 200 <= response.status_code < 300:
delivery_record["status"] = "success"
logger.info(f"Webhook {delivery_id} delivered successfully in {elapsed_ms}ms")
return delivery_record
else:
logger.warning(f"Webhook {delivery_id} failed with status {response.status_code}")
# Only retry on certain status codes
if response.status_code in [408, 429, 500, 502, 503, 504]:
# Will retry
await asyncio.sleep(retry_delay * attempt) # Exponential backoff
else:
# Won't retry client errors except those listed above
delivery_record["status"] = "failed"
return delivery_record
except Exception as e:
logger.error(f"Webhook {delivery_id} error: {str(e)}")
delivery_record["response"] = {
"error": str(e),
"elapsed_ms": 0
}
await asyncio.sleep(retry_delay * attempt) # Exponential backoff
# If we've exhausted retries
delivery_record["status"] = "failed"
return delivery_record
def register_webhook(
event_type: str,
url: str,
description: Optional[str] = None,
secret: Optional[str] = None,
headers: Optional[Dict[str, str]] = None
) -> Dict[str, Any]:
"""
Register a new webhook subscriber.
Args:
event_type: The type of event to subscribe to
url: The URL to send webhook notifications to
description: Optional description of this webhook
secret: Optional secret for signing webhook payloads
headers: Optional additional headers to include in webhook requests
Returns:
The registered webhook configuration
"""
webhook_id = f"hook_{len(registered_webhooks) + 1}_{int(time.time())}"
webhook = {
"id": webhook_id,
"event_type": event_type,
"url": url,
"description": description or f"Webhook for {event_type} events",
"created_at": time.time(),
"secret": secret,
"headers": headers or {},
"active": True
}
registered_webhooks.append(webhook)
logger.info(f"Registered new webhook {webhook_id} for event type '{event_type}' to {url}")
return webhook
async def trigger_webhooks(
event_type: str,
payload: Dict[str, Any],
background_tasks: BackgroundTasks
) -> List[str]:
"""
Trigger webhooks for a specific event type.
Args:
event_type: The type of event that occurred
payload: The data to send to webhook subscribers
background_tasks: FastAPI BackgroundTasks for async processing
Returns:
List of webhook IDs that were triggered
"""
triggered_ids = []
# Enhance payload with event metadata
enhanced_payload = {
"event_type": event_type,
"timestamp": time.time(),
"data": payload
}
for webhook in registered_webhooks:
if webhook["active"] and webhook["event_type"] == event_type:
# Add to background tasks to not block the response
webhook_id = webhook["id"]
triggered_ids.append(webhook_id)
# Schedule webhook delivery as a background task
background_tasks.add_task(
send_webhook,
url=webhook["url"],
payload=enhanced_payload,
headers=webhook["headers"]
)
logger.info(f"Queued webhook {webhook_id} for delivery")
return triggered_ids
def get_webhooks() -> List[Dict[str, Any]]:
"""
Get all registered webhooks.
Returns:
List of registered webhook configurations
"""
return registered_webhooks
def delete_webhook(webhook_id: str) -> Dict[str, Any]:
"""
Delete a registered webhook by ID.
Args:
webhook_id: ID of the webhook to delete
Returns:
A status message
"""
global registered_webhooks
# Find the webhook by ID
for i, webhook in enumerate(registered_webhooks):
if webhook["id"] == webhook_id:
# Remove from the list
removed = registered_webhooks.pop(i)
logger.info(f"Deleted webhook {webhook_id}")
return {"message": f"Webhook {webhook_id} deleted successfully"}
# If we get here, no webhook was found
logger.warning(f"Webhook {webhook_id} not found for deletion")
return {"message": f"Webhook {webhook_id} not found"}
def get_webhook_history(
limit: int = 50,
webhook_id: Optional[str] = None,
status: Optional[str] = None
) -> List[Dict[str, Any]]:
"""
Get webhook delivery history, optionally filtered.
Args:
limit: Maximum number of records to return
webhook_id: Optional filter by webhook ID
status: Optional filter by delivery status
Returns:
List of webhook delivery records
"""
filtered_history = webhook_history.copy()
if webhook_id:
filtered_history = [h for h in filtered_history if h.get("webhook_id") == webhook_id]
if status:
filtered_history = [h for h in filtered_history if h.get("status") == status]
# Sort by timestamp descending (newest first)
filtered_history.sort(key=lambda x: x.get("timestamp", 0), reverse=True)
return filtered_history[:limit]
async def test_webhook(webhook_id: str) -> Dict[str, Any]:
"""
Test a webhook by sending a sample payload.
Args:
webhook_id: ID of the webhook to test
Returns:
Test result with delivery status and response details
"""
# Find the webhook by ID
webhook = None
for w in registered_webhooks:
if w["id"] == webhook_id:
webhook = w
break
if not webhook:
return {
"error": "Webhook not found",
"webhook_id": webhook_id,
"status": "failed"
}
# Detect webhook service type from URL to send appropriate payload
webhook_url = webhook["url"].lower()
# Create base test data
base_data = {
"data.created": {
"id": "test_123",
"name": "Test Item",
"email": "test@example.com",
"created_at": time.time()
},
"data.updated": {
"id": "test_123",
"name": "Updated Test Item",
"email": "updated@example.com",
"updated_at": time.time()
},
"data.deleted": {
"id": "test_123",
"deleted_at": time.time()
},
"auth.login": {
"user_id": "test_user_123",
"username": "testuser",
"login_time": time.time(),
"ip_address": "192.168.1.100"
}
}
base_test_data = base_data.get(webhook["event_type"], {
"test": True,
"message": f"Test webhook for {webhook['event_type']} event",
"timestamp": time.time()
})
# Create service-specific payload
if "mattermost" in webhook_url or "mm." in webhook_url:
# Mattermost-specific format (very strict)
test_data = {
"text": f"๐งช **Webhook Test** - {webhook['event_type']} event from MockLoop\n\n**Event Details:**\n- Type: {webhook['event_type']}\n- Test Data: {json.dumps(base_test_data, indent=2)[:200]}...",
"username": "MockLoop",
"icon_emoji": ":test_tube:",
"channel": "#general"
}
elif "slack" in webhook_url:
# Slack-specific format
test_data = {
"text": f"๐งช Webhook Test - {webhook['event_type']} event from MockLoop",
"username": "MockLoop",
"icon_emoji": ":test_tube:",
"attachments": [{
"color": "good",
"title": "Webhook Test Details",
"fields": [
{
"title": "Event Type",
"value": webhook['event_type'],
"short": True
},
{
"title": "Test Data",
"value": f"```{json.dumps(base_test_data, indent=2)[:300]}```",
"short": False
}
]
}]
}
elif "discord" in webhook_url:
# Discord-specific format
test_data = {
"content": f"๐งช Webhook Test: {webhook['event_type']} event from MockLoop",
"username": "MockLoop",
"embeds": [{
"title": "Webhook Test",
"description": f"Testing {webhook['event_type']} event",
"color": 3447003,
"fields": [
{
"name": "Event Type",
"value": webhook['event_type'],
"inline": True
},
{
"name": "Test Data",
"value": f"```json\n{json.dumps(base_test_data, indent=2)[:500]}\n```",
"inline": False
}
],
"timestamp": time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime())
}]
}
elif "httpbin.org" in webhook_url:
# HTTPBin format (accepts anything, good for testing)
test_data = {
"event_type": webhook["event_type"],
"timestamp": time.time(),
"data": base_test_data,
"webhook_test": True,
"source": "MockLoop"
}
else:
# Generic webhook format for unknown services
test_data = {
"event_type": webhook["event_type"],
"timestamp": time.time(),
"data": base_test_data,
"text": f"๐งช Webhook Test - {webhook['event_type']} event from MockLoop",
"webhook_test": True,
"source": "MockLoop"
}
# Send the test webhook
try:
result = await send_webhook(
url=webhook["url"],
payload=test_data,
headers=webhook.get("headers", {}),
max_retries=1 # Only try once for tests
)
# Return the test result
return {
"webhook_id": webhook_id,
"test_payload": test_data,
"delivery_result": result,
"status": "success" if result["status"] == "success" else "failed"
}
except Exception as e:
logger.error(f"Error testing webhook {webhook_id}: {str(e)}")
return {
"webhook_id": webhook_id,
"error": str(e),
"status": "failed"
}