Skip to main content
Glama
fcavalcantirj

Zapr WhatsApp MCP Server

server.py21.5 kB
#!/usr/bin/env python3 """ Zapr WhatsApp MCP Server A Model Context Protocol server for WhatsApp messaging via zapr.link """ import asyncio import os import sys import json import aiohttp from mcp.server import Server, NotificationOptions from mcp.server.stdio import stdio_server from mcp.server.models import InitializationOptions from mcp.types import ( Tool, TextContent, Resource, Prompt, PromptArgument ) # Server info SERVER_NAME = "zapr-whatsapp-mcp" SERVER_VERSION = "1.0.0" class ZaprMCPServer: def __init__(self): self.api_host = os.getenv("ZAPR_API_HOST", "https://api.zapr.link") self.session_id = os.getenv("ZAPR_SESSION_ID") if not self.session_id: raise ValueError("ZAPR_SESSION_ID environment variable is required") # Create MCP server self.server = Server(SERVER_NAME) # Register handlers self.setup_handlers() def setup_handlers(self): """Set up MCP protocol handlers""" @self.server.list_tools() async def list_tools() -> list[Tool]: """List available tools""" return [ Tool( name="send_whatsapp", description="Send a WhatsApp message to a single number", inputSchema={ "type": "object", "properties": { "number": { "type": "string", "description": "Recipient phone number (international format)" }, "message": { "type": "string", "description": "Message content to send" } }, "required": ["number", "message"] } ), Tool( name="bulk_send_whatsapp", description="Send WhatsApp messages to multiple numbers", inputSchema={ "type": "object", "properties": { "numbers": { "type": "array", "items": {"type": "string"}, "description": "List of recipient phone numbers" }, "message": { "type": "string", "description": "Message content to send" } }, "required": ["numbers", "message"] } ), Tool( name="get_session_status", description="Check the status of a WhatsApp session", inputSchema={ "type": "object", "properties": {}, "required": [] } ), Tool( name="validate_number", description="Check if a number is valid on WhatsApp", inputSchema={ "type": "object", "properties": { "number": { "type": "string", "description": "Phone number to validate" } }, "required": ["number"] } ), Tool( name="send_message_with_reply", description="Send a WhatsApp message and wait for reply", inputSchema={ "type": "object", "properties": { "number": { "type": "string", "description": "Recipient phone number" }, "message": { "type": "string", "description": "Message to send" }, "num_replies": { "type": "integer", "description": "Number of replies to wait for (1-3)", "default": 1 }, "timeout_secs": { "type": "integer", "description": "Timeout in seconds (default 60)", "default": 60 } }, "required": ["number", "message"] } ), Tool( name="get_recent_messages", description="Get recent WhatsApp messages from the session", inputSchema={ "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of messages to retrieve (default 10)", "default": 10 } }, "required": [] } ), Tool( name="list_prompts", description="List all available prompt templates for WhatsApp messaging", inputSchema={ "type": "object", "properties": {}, "required": [] } ) ] @self.server.call_tool() async def call_tool(name: str, arguments: dict) -> list: """Handle tool calls""" try: # Always add sessionId to arguments if not present if "sessionId" not in arguments: arguments["sessionId"] = self.session_id if name == "send_whatsapp": return await self._send_whatsapp(arguments) elif name == "bulk_send_whatsapp": return await self._bulk_send_whatsapp(arguments) elif name == "get_session_status": return await self._get_session_status() elif name == "validate_number": return await self._validate_number(arguments) elif name == "send_message_with_reply": return await self._send_reply(arguments) elif name == "get_recent_messages": return await self._get_recent_messages(arguments) elif name == "list_prompts": return await self._list_prompts_tool() else: raise ValueError(f"Unknown tool: {name}") except Exception as e: return [TextContent(type="text", text=f"Error: {str(e)}")] @self.server.list_resources() async def list_resources() -> list[Resource]: """List available resources""" return [ Resource( uri="zapr://session/status", name="Session Status", description="Current WhatsApp session status and statistics", mimeType="application/json" ), Resource( uri="zapr://session/info", name="Session Information", description="Detailed session information including limits and plan", mimeType="application/json" ) ] @self.server.read_resource() async def read_resource(uri: str) -> str: """Read a specific resource""" if uri == "zapr://session/status": status = await self._get_session_status() return status[0].text elif uri == "zapr://session/info": return json.dumps({ "session_id": self.session_id, "api_host": self.api_host, "status": "connected", "plan": "free", "limits": { "mcp_daily": 5, "api_daily": 10 } }) else: raise ValueError(f"Unknown resource: {uri}") @self.server.list_prompts() async def list_prompts(): """List available prompts""" print("DEBUG: list_prompts called!", file=sys.stderr) return [ Prompt( name="send_message", description="Send a WhatsApp message to a contact", arguments=[ PromptArgument( name="number", description="The recipient's phone number", required=True ), PromptArgument( name="message", description="The message content", required=True ) ] ), Prompt( name="bulk_message", description="Send the same message to multiple WhatsApp contacts", arguments=[ PromptArgument( name="numbers", description="Comma-separated list of phone numbers", required=True ), PromptArgument( name="message", description="The message content", required=True ) ] ) ] @self.server.get_prompt() async def get_prompt(name: str, arguments: dict): """Get a specific prompt""" if name == "send_message": number = arguments.get("number", "+1234567890") message = arguments.get("message", "Hello from Zapr!") return { "description": "Send a WhatsApp message to a contact", "messages": [ { "role": "user", "content": { "type": "text", "text": f"Please send the following WhatsApp message:\n\nTo: {number}\nMessage: {message}\n\nUse the send_whatsapp tool to send this message." } } ] } elif name == "bulk_message": numbers = arguments.get("numbers", "+1234567890,+0987654321") message = arguments.get("message", "Hello from Zapr!") numbers_list = [n.strip() for n in numbers.split(",")] return { "description": "Send the same message to multiple WhatsApp contacts", "messages": [ { "role": "user", "content": { "type": "text", "text": f"Please send the following message to multiple contacts:\n\nTo: {', '.join(numbers_list)}\nMessage: {message}\n\nUse the bulk_send_whatsapp tool to send this message." } } ] } else: raise ValueError(f"Unknown prompt: {name}") async def _send_whatsapp(self, args: dict) -> list: """Send single WhatsApp message""" payload = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "send_whatsapp", "arguments": { "sessionId": self.session_id, "number": args["number"], "message": args["message"] } } } result = await self._call_api(payload) return [TextContent(type="text", text=str(result))] async def _bulk_send_whatsapp(self, args: dict) -> list: """Send bulk WhatsApp messages""" payload = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "bulk_send_whatsapp", "arguments": { "sessionId": self.session_id, "numbers": args["numbers"], "message": args["message"] } } } result = await self._call_api(payload) return [TextContent(type="text", text=str(result))] async def _get_session_status(self) -> list: """Get session status""" payload = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "get_session_status", "arguments": { "sessionId": self.session_id } } } result = await self._call_api(payload) return [TextContent(type="text", text=str(result))] async def _validate_number(self, args: dict) -> list: """Validate phone number""" payload = { "jsonrpc": "2.0", "id": 1, "method": "tools/call", "params": { "name": "validate_number", "arguments": { "sessionId": self.session_id, "number": args["number"] } } } result = await self._call_api(payload) return [TextContent(type="text", text=str(result))] async def _send_reply(self, args: dict) -> list: """Send message and wait for reply""" # Prepare request for message-with-reply endpoint number = args["number"] message = args["message"] num_replies = args.get("num_replies", 1) timeout_secs = args.get("timeout_secs", 60) # Call the message-with-reply endpoint directly session_id = self.session_id async with aiohttp.ClientSession() as session: async with session.post( f"{self.api_host}/message-with-reply/{session_id}", json={ "number": number, "message": message, "num_replies": num_replies, "timeout_secs": timeout_secs }, headers={ "Content-Type": "application/json", "X-MCP-Request": "true" }, timeout=aiohttp.ClientTimeout(total=timeout_secs + 10) # Add buffer to timeout ) as response: if response.status == 200: try: result = await response.json() # Format the reply nicely if isinstance(result, dict) and result.get("replies"): replies_text = "\n\n📩 Replies received:\n" for i, reply in enumerate(result["replies"], 1): # reply is a string, not an object replies_text += f"{i}. {reply}\n" return [TextContent(type="text", text=f"✅ Message sent to {number}\n{replies_text}")] else: return [TextContent(type="text", text=f"✅ Message sent to {number}\n⏰ No replies received within {timeout_secs} seconds")] except Exception as e: return [TextContent(type="text", text=f"❌ Failed to parse response: {str(e)}")] else: error_text = await response.text() return [TextContent(type="text", text=f"❌ Failed to send message: {error_text}")] async def _get_recent_messages(self, args: dict) -> list: """Get recent messages from the session""" limit = args.get("limit", 10) session_id = self.session_id # Call the messages API to get recent messages async with aiohttp.ClientSession() as session: async with session.get( f"{self.api_host}/{session_id}/recent-messages", params={"limit": limit}, headers={ "X-MCP-Request": "true" }, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status == 200: response_data = await response.json() if response_data and response_data.get("messages"): messages = response_data["messages"] messages_text = f"📱 Recent {len(messages)} messages:\n\n" for msg in messages: timestamp = msg.get("timestamp", "") from_number = msg.get("number", "Unknown") # Fixed: use "number" field message_text = msg.get("message", "") is_from_me = msg.get("is_from_me", False) sender = "You" if is_from_me else from_number messages_text += f"[{timestamp}] {sender}: {message_text}\n" return [TextContent(type="text", text=messages_text)] else: return [TextContent(type="text", text="📱 No recent messages found")] else: return [TextContent(type="text", text=f"❌ Failed to get messages: HTTP {response.status}")] async def _list_prompts_tool(self) -> list: """List all available prompt templates as a tool function""" prompts_info = """# 📋 zapr-whatsapp MCP Functions & Examples | Function | Natural Prompt Example | |----------|------------------------| | `send_whatsapp` | "Send a WhatsApp message to +5521981328933 saying 'Hello from zapr!'" | | `bulk_send_whatsapp` | "Send 'Meeting at 3pm today' to these contacts: +5521981328933, +5521988570927" | | `send_message_with_reply` | "Ask +5521981328933 'What's your favorite color?' and wait for their response" | | `get_recent_messages` | "Show me the last 10 WhatsApp messages from this session" | | `get_session_status` | "Check if my WhatsApp Web connection is active" | | `validate_number` | "Is +5521981328933 a valid WhatsApp number?" | | `list_prompts` | "What functions are available in the WhatsApp MCP server?" | **Prompt Templates**: `send_message`, `bulk_message` - Generate structured instructions for messaging workflows.""" return [TextContent(type="text", text=prompts_info)] async def _call_api(self, payload: dict) -> dict: """Call Zapr API with MCP request""" async with aiohttp.ClientSession() as session: async with session.post( f"{self.api_host}/mcp", json=payload, headers={ "Content-Type": "application/json", "X-MCP-Request": "true", "X-Session-ID": self.session_id }, timeout=aiohttp.ClientTimeout(total=30) ) as response: if response.status == 200: return await response.json() else: raise Exception(f"API error {response.status}: {await response.text()}") async def main(): """Main entry point""" try: print("DEBUG: Starting Zapr MCP Server...", file=sys.stderr) server = ZaprMCPServer() print("DEBUG: MCP Server initialized successfully", file=sys.stderr) # Run stdio server async with stdio_server() as (read_stream, write_stream): await server.server.run( read_stream, write_stream, InitializationOptions( server_name=SERVER_NAME, server_version=SERVER_VERSION, capabilities=server.server.get_capabilities( notification_options=NotificationOptions(), experimental_capabilities={} ) ) ) except KeyboardInterrupt: pass except Exception as e: print(f"Error: {e}", file=sys.stderr) raise if __name__ == "__main__": asyncio.run(main())

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/fcavalcantirj/zapr-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server