Skip to main content
Glama

Telnyx MCP Server

Official
by team-telnyx
server.py14.1 kB
"""Lightweight Unix Socket server for handling webhooks without port conflicts.""" from collections import deque from datetime import datetime from http.server import BaseHTTPRequestHandler import json import os from socketserver import UnixStreamServer import tempfile import threading from typing import Any, Dict, List, Optional from ..config import settings from ..utils.logger import get_logger logger = get_logger(__name__) # Store webhook history (most recent first) webhook_history = deque(maxlen=100) # Store last 100 webhooks class UnixSocketHandler(BaseHTTPRequestHandler): """HTTP request handler for Unix domain sockets.""" # Ensure HTTP/1.1 is used consistently protocol_version = "HTTP/1.1" # Override address_string to fix the IndexError with Unix sockets def address_string(self): """Return a string representation of the client address.""" # For Unix sockets, client_address is often a string or None # Just return a placeholder instead of trying to index it return "unix-client" # Override log_message to ensure it doesn't fail with Unix sockets def log_message(self, format, *args): """Log an arbitrary message to the server log.""" # Use a more robust implementation that won't fail with Unix sockets try: message = format % args logger.info( f"[UnixSocketHandler] {self.command} {self.path} - {message}" ) except Exception as e: # Failsafe logging that doesn't rely on string formatting logger.info( f"[UnixSocketHandler] Request processed: {self.command} {self.path}" ) def do_POST(self): """Handle POST requests from the ngrok tunnel.""" try: # Get content length content_length = int(self.headers.get("Content-Length", 0)) if content_length > settings.webhook_max_body_size: self.send_error(413, "Request entity too large") return # Read the request body body = self.rfile.read(content_length) # Process the webhook request self._process_webhook(body) # Prepare a valid JSON response response = json.dumps( { "status": "success", "message": "Webhook received", "timestamp": datetime.now().isoformat(), } ) response_bytes = response.encode("utf-8") try: # Send a properly formatted HTTP response with more robust error handling self.send_response(200) # Set headers with proper HTTP/1.1 compliance self.send_header( "Content-Type", "application/json; charset=utf-8" ) self.send_header("Content-Length", str(len(response_bytes))) self.send_header( "Connection", "close" ) # Important for HTTP/1.1 # Add more headers to help with debugging self.send_header("X-Webhook-Handler", "Telnyx-MCP-Unix-Socket") self.send_header("Cache-Control", "no-store, no-cache") self.end_headers() # Write the response in a try-except block try: self.wfile.write(response_bytes) self.wfile.flush() # Ensure the response is sent completely except (BrokenPipeError, ConnectionResetError) as pipe_error: # Client disconnected - log but don't re-raise logger.warning( f"Client disconnected during response: {pipe_error}" ) except Exception as write_error: logger.error(f"Error writing response: {write_error}") # Don't re-raise - we've already processed the webhook except Exception as resp_error: logger.error(f"Error sending HTTP response: {resp_error}") # Don't re-raise - we've already processed the webhook except Exception as e: logger.error(f"Error handling webhook: {str(e)}") # Send a properly formatted error response with enhanced robustness try: error_response = json.dumps( { "status": "error", "message": str(e), "timestamp": datetime.now().isoformat(), } ) error_bytes = error_response.encode("utf-8") self.send_response(500) self.send_header( "Content-Type", "application/json; charset=utf-8" ) self.send_header("Content-Length", str(len(error_bytes))) self.send_header("Connection", "close") self.send_header("X-Webhook-Handler", "Telnyx-MCP-Unix-Socket") self.end_headers() # Write error response with exception handling try: self.wfile.write(error_bytes) self.wfile.flush() except (BrokenPipeError, ConnectionResetError): # Client disconnected - just log it logger.warning("Client disconnected during error response") except Exception as write_err: logger.error(f"Error writing error response: {write_err}") except Exception as resp_err: logger.error(f"Failed to send error response: {resp_err}") def do_GET(self): """Handle GET requests for health checks.""" try: # Prepare a valid JSON response response = json.dumps( { "status": "ok", "time": datetime.now().isoformat(), "path": self.path, "webhook_server": "Telnyx-MCP-Unix-Socket", } ) response_bytes = response.encode("utf-8") # Send a properly formatted HTTP response with robust error handling self.send_response(200) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(response_bytes))) self.send_header("Connection", "close") self.send_header("X-Webhook-Handler", "Telnyx-MCP-Unix-Socket") self.send_header("Cache-Control", "no-store, no-cache") self.end_headers() # Write the response with exception handling try: self.wfile.write(response_bytes) self.wfile.flush() # Ensure the response is sent completely except (BrokenPipeError, ConnectionResetError) as pipe_error: # Client disconnected - log but don't re-raise logger.warning( f"Client disconnected during health check response: {pipe_error}" ) except Exception as write_error: logger.error( f"Error writing health check response: {write_error}" ) except Exception as e: logger.error(f"Error handling health check request: {str(e)}") # Try to send an error response try: error_response = json.dumps( {"status": "error", "message": str(e)} ) error_bytes = error_response.encode("utf-8") self.send_response(500) self.send_header( "Content-Type", "application/json; charset=utf-8" ) self.send_header("Content-Length", str(len(error_bytes))) self.send_header("Connection", "close") self.end_headers() self.wfile.write(error_bytes) self.wfile.flush() except Exception: # If that also fails, just log and continue logger.error("Failed to send health check error response") def _process_webhook(self, body: bytes) -> None: """Process the webhook request.""" try: # Parse the request body as JSON if body: payload = json.loads(body) # Extract event type from payload - check both root level and nested data # This handles both {event_type: "x"} and {data: {event_type: "x"}} formats if "event_type" in payload: event_type = payload.get("event_type", "unknown") else: # Fall back to checking in data object or default to unknown event_type = payload.get("data", {}).get( "event_type", "unknown" ) # Log the webhook logger.info(f"Received webhook event: {event_type}") logger.debug( f"Webhook payload: {json.dumps(payload, indent=2)}" ) # Store in history webhook_history.appendleft( { "timestamp": datetime.now().isoformat(), "event_type": event_type, "payload": payload, "headers": dict(self.headers), } ) logger.debug( f"Added webhook to history (total: {len(webhook_history)})" ) else: logger.warning("Received empty webhook body") except json.JSONDecodeError: logger.error("Failed to parse webhook payload as JSON") except Exception as e: logger.error(f"Error processing webhook: {str(e)}") raise # Re-raise to allow proper error response class UnixSocketHTTPServer(UnixStreamServer): """HTTP server using Unix domain sockets.""" def __init__(self, server_address, RequestHandlerClass): """Initialize the server with a Unix socket.""" super().__init__(server_address, RequestHandlerClass) self._thread = None self._running = False def start(self): """Start the server in a background thread.""" self._running = True self._thread = threading.Thread(target=self.serve_forever) self._thread.daemon = True self._thread.start() logger.info(f"Unix socket server started on {self.server_address}") def stop(self): """Stop the server.""" if self._running: self._running = False self.shutdown() if self._thread and self._thread.is_alive(): self._thread.join(timeout=5.0) self.server_close() # Clean up the socket file try: if os.path.exists(self.server_address): os.unlink(self.server_address) logger.info(f"Removed socket file: {self.server_address}") except OSError as e: logger.warning(f"Error removing socket file: {e}") # Global server instance socket_server = None socket_path = None def generate_socket_path() -> str: """Generate a unique socket path.""" temp_dir = tempfile.mkdtemp(prefix="telnyx_mcp_") socket_name = f"webhook-{os.getpid()}.sock" path = os.path.join(temp_dir, socket_name) return path def get_webhook_history(limit: int = None) -> List[Dict[str, Any]]: """ Get the webhook history. Args: limit: Maximum number of webhooks to return (None for all) Returns: List of webhook events, most recent first """ if limit is None or limit > len(webhook_history): return list(webhook_history) return list(webhook_history)[:limit] def start_webhook_server() -> Optional[str]: """ Start the webhook server using a Unix domain socket. Returns: Optional[str]: The socket path if successful, None otherwise """ global socket_server, socket_path if not settings.webhook_enabled: logger.info("Webhook server is disabled") return None try: # Generate a unique socket path socket_path = generate_socket_path() logger.info(f"Using Unix domain socket at: {socket_path}") # Create the directory if it doesn't exist os.makedirs(os.path.dirname(socket_path), exist_ok=True) # Clean up any existing socket file if os.path.exists(socket_path): try: os.unlink(socket_path) except OSError as e: logger.warning(f"Could not remove existing socket file: {e}") # Create and start the server socket_server = UnixSocketHTTPServer(socket_path, UnixSocketHandler) socket_server.start() # Register cleanup on exit - but avoid signal handlers which won't work in all threads import atexit atexit.register(stop_webhook_server) return socket_path except Exception as e: logger.error(f"Failed to start webhook server: {e}") return None def stop_webhook_server() -> None: """Stop the webhook server.""" global socket_server, socket_path if socket_server: logger.info("Stopping webhook server...") socket_server.stop() socket_server = None # Clean up the socket directory if socket_path: try: dir_path = os.path.dirname(socket_path) if os.path.exists(dir_path) and dir_path.startswith( tempfile.gettempdir() ): os.rmdir(dir_path) logger.info(f"Removed socket directory: {dir_path}") except OSError as e: logger.warning(f"Error removing socket directory: {e}") socket_path = None

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/team-telnyx/telnyx-mcp-server'

If you have feedback or need assistance with the MCP directory API, please join our Discord server