Skip to main content
Glama

MockLoop MCP Server

Official
by MockLoop
webhook_template.j2โ€ข13.7 kB
{# Jinja2 template for webhook handling functionality #} import asyncio import json import logging import time from typing import Dict, List, Optional, Any, Callable import httpx from fastapi import BackgroundTasks # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("webhook_handler") # Store for registered webhooks registered_webhooks: List[Dict[str, Any]] = [] # Store for webhook delivery history webhook_history: List[Dict[str, Any]] = [] async def send_webhook( url: str, payload: Dict[str, Any], headers: Optional[Dict[str, str]] = None, max_retries: int = 3, retry_delay: int = 2 ) -> Dict[str, Any]: """ Send a webhook notification to the specified URL. Args: url: The webhook endpoint URL payload: The data to send as JSON headers: Optional HTTP headers for the request max_retries: Maximum number of retry attempts retry_delay: Delay between retries in seconds Returns: Dict containing the status of the webhook delivery """ default_headers = { "Content-Type": "application/json", "User-Agent": "MockLoop-Webhook/1.0" } if headers: default_headers.update(headers) # Create delivery ID and record for history delivery_id = f"wh_{int(time.time())}_{len(webhook_history) + 1}" delivery_record = { "id": delivery_id, "url": url, "payload": payload, "headers": default_headers, "timestamp": time.time(), "status": "pending", "attempts": 0, "response": None } webhook_history.append(delivery_record) # Perform the actual webhook delivery with retries attempt = 0 while attempt < max_retries: try: attempt += 1 delivery_record["attempts"] = attempt logger.info(f"Sending webhook {delivery_id} to {url} (attempt {attempt}/{max_retries})") async with httpx.AsyncClient(timeout=10.0) as client: start_time = time.time() response = await client.post( url, json=payload, headers=default_headers ) elapsed_ms = int((time.time() - start_time) * 1000) delivery_record["response"] = { "status_code": response.status_code, "headers": dict(response.headers), "body": response.text[:1000], # Truncate response if too large "elapsed_ms": elapsed_ms } if 200 <= response.status_code < 300: delivery_record["status"] = "success" logger.info(f"Webhook {delivery_id} delivered successfully in {elapsed_ms}ms") return delivery_record else: logger.warning(f"Webhook {delivery_id} failed with status {response.status_code}") # Only retry on certain status codes if response.status_code in [408, 429, 500, 502, 503, 504]: # Will retry await asyncio.sleep(retry_delay * attempt) # Exponential backoff else: # Won't retry client errors except those listed above delivery_record["status"] = "failed" return delivery_record except Exception as e: logger.error(f"Webhook {delivery_id} error: {str(e)}") delivery_record["response"] = { "error": str(e), "elapsed_ms": 0 } await asyncio.sleep(retry_delay * attempt) # Exponential backoff # If we've exhausted retries delivery_record["status"] = "failed" return delivery_record def register_webhook( event_type: str, url: str, description: Optional[str] = None, secret: Optional[str] = None, headers: Optional[Dict[str, str]] = None ) -> Dict[str, Any]: """ Register a new webhook subscriber. Args: event_type: The type of event to subscribe to url: The URL to send webhook notifications to description: Optional description of this webhook secret: Optional secret for signing webhook payloads headers: Optional additional headers to include in webhook requests Returns: The registered webhook configuration """ webhook_id = f"hook_{len(registered_webhooks) + 1}_{int(time.time())}" webhook = { "id": webhook_id, "event_type": event_type, "url": url, "description": description or f"Webhook for {event_type} events", "created_at": time.time(), "secret": secret, "headers": headers or {}, "active": True } registered_webhooks.append(webhook) logger.info(f"Registered new webhook {webhook_id} for event type '{event_type}' to {url}") return webhook async def trigger_webhooks( event_type: str, payload: Dict[str, Any], background_tasks: BackgroundTasks ) -> List[str]: """ Trigger webhooks for a specific event type. Args: event_type: The type of event that occurred payload: The data to send to webhook subscribers background_tasks: FastAPI BackgroundTasks for async processing Returns: List of webhook IDs that were triggered """ triggered_ids = [] # Enhance payload with event metadata enhanced_payload = { "event_type": event_type, "timestamp": time.time(), "data": payload } for webhook in registered_webhooks: if webhook["active"] and webhook["event_type"] == event_type: # Add to background tasks to not block the response webhook_id = webhook["id"] triggered_ids.append(webhook_id) # Schedule webhook delivery as a background task background_tasks.add_task( send_webhook, url=webhook["url"], payload=enhanced_payload, headers=webhook["headers"] ) logger.info(f"Queued webhook {webhook_id} for delivery") return triggered_ids def get_webhooks() -> List[Dict[str, Any]]: """ Get all registered webhooks. Returns: List of registered webhook configurations """ return registered_webhooks def delete_webhook(webhook_id: str) -> Dict[str, Any]: """ Delete a registered webhook by ID. Args: webhook_id: ID of the webhook to delete Returns: A status message """ global registered_webhooks # Find the webhook by ID for i, webhook in enumerate(registered_webhooks): if webhook["id"] == webhook_id: # Remove from the list removed = registered_webhooks.pop(i) logger.info(f"Deleted webhook {webhook_id}") return {"message": f"Webhook {webhook_id} deleted successfully"} # If we get here, no webhook was found logger.warning(f"Webhook {webhook_id} not found for deletion") return {"message": f"Webhook {webhook_id} not found"} def get_webhook_history( limit: int = 50, webhook_id: Optional[str] = None, status: Optional[str] = None ) -> List[Dict[str, Any]]: """ Get webhook delivery history, optionally filtered. Args: limit: Maximum number of records to return webhook_id: Optional filter by webhook ID status: Optional filter by delivery status Returns: List of webhook delivery records """ filtered_history = webhook_history.copy() if webhook_id: filtered_history = [h for h in filtered_history if h.get("webhook_id") == webhook_id] if status: filtered_history = [h for h in filtered_history if h.get("status") == status] # Sort by timestamp descending (newest first) filtered_history.sort(key=lambda x: x.get("timestamp", 0), reverse=True) return filtered_history[:limit] async def test_webhook(webhook_id: str) -> Dict[str, Any]: """ Test a webhook by sending a sample payload. Args: webhook_id: ID of the webhook to test Returns: Test result with delivery status and response details """ # Find the webhook by ID webhook = None for w in registered_webhooks: if w["id"] == webhook_id: webhook = w break if not webhook: return { "error": "Webhook not found", "webhook_id": webhook_id, "status": "failed" } # Detect webhook service type from URL to send appropriate payload webhook_url = webhook["url"].lower() # Create base test data base_data = { "data.created": { "id": "test_123", "name": "Test Item", "email": "test@example.com", "created_at": time.time() }, "data.updated": { "id": "test_123", "name": "Updated Test Item", "email": "updated@example.com", "updated_at": time.time() }, "data.deleted": { "id": "test_123", "deleted_at": time.time() }, "auth.login": { "user_id": "test_user_123", "username": "testuser", "login_time": time.time(), "ip_address": "192.168.1.100" } } base_test_data = base_data.get(webhook["event_type"], { "test": True, "message": f"Test webhook for {webhook['event_type']} event", "timestamp": time.time() }) # Create service-specific payload if "mattermost" in webhook_url or "mm." in webhook_url: # Mattermost-specific format (very strict) test_data = { "text": f"๐Ÿงช **Webhook Test** - {webhook['event_type']} event from MockLoop\n\n**Event Details:**\n- Type: {webhook['event_type']}\n- Test Data: {json.dumps(base_test_data, indent=2)[:200]}...", "username": "MockLoop", "icon_emoji": ":test_tube:", "channel": "#general" } elif "slack" in webhook_url: # Slack-specific format test_data = { "text": f"๐Ÿงช Webhook Test - {webhook['event_type']} event from MockLoop", "username": "MockLoop", "icon_emoji": ":test_tube:", "attachments": [{ "color": "good", "title": "Webhook Test Details", "fields": [ { "title": "Event Type", "value": webhook['event_type'], "short": True }, { "title": "Test Data", "value": f"```{json.dumps(base_test_data, indent=2)[:300]}```", "short": False } ] }] } elif "discord" in webhook_url: # Discord-specific format test_data = { "content": f"๐Ÿงช Webhook Test: {webhook['event_type']} event from MockLoop", "username": "MockLoop", "embeds": [{ "title": "Webhook Test", "description": f"Testing {webhook['event_type']} event", "color": 3447003, "fields": [ { "name": "Event Type", "value": webhook['event_type'], "inline": True }, { "name": "Test Data", "value": f"```json\n{json.dumps(base_test_data, indent=2)[:500]}\n```", "inline": False } ], "timestamp": time.strftime('%Y-%m-%dT%H:%M:%S.000Z', time.gmtime()) }] } elif "httpbin.org" in webhook_url: # HTTPBin format (accepts anything, good for testing) test_data = { "event_type": webhook["event_type"], "timestamp": time.time(), "data": base_test_data, "webhook_test": True, "source": "MockLoop" } else: # Generic webhook format for unknown services test_data = { "event_type": webhook["event_type"], "timestamp": time.time(), "data": base_test_data, "text": f"๐Ÿงช Webhook Test - {webhook['event_type']} event from MockLoop", "webhook_test": True, "source": "MockLoop" } # Send the test webhook try: result = await send_webhook( url=webhook["url"], payload=test_data, headers=webhook.get("headers", {}), max_retries=1 # Only try once for tests ) # Return the test result return { "webhook_id": webhook_id, "test_payload": test_data, "delivery_result": result, "status": "success" if result["status"] == "success" else "failed" } except Exception as e: logger.error(f"Error testing webhook {webhook_id}: {str(e)}") return { "webhook_id": webhook_id, "error": str(e), "status": "failed" }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/MockLoop/mockloop-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server