WhatsApp MCP Server
by msaelices
Verified
"""Authentication module for WhatsApp MCP Server."""
import logging
import os
from typing import Dict, Tuple, Any
from dotenv import load_dotenv
# Import the WhatsApp API client
from whatsapp_api_client_python.API import GreenApi
# Load environment variables
load_dotenv()
logger = logging.getLogger(__name__)
class WhatsAppClient:
"""WhatsApp client implementation using whatsapp-api-client-python."""
def __init__(self) -> None:
self.is_authenticated = False
self.session_data: Dict[str, Any] = {}
self.client = None
self.qr_code = None
self.state = "DISCONNECTED"
async def initialize(self) -> bool:
"""Initialize the client."""
logger.info("Initializing WhatsApp client")
try:
# Initialize the WhatsApp API client with GreenAPI credentials from environment variables
id_instance = os.getenv("GREENAPI_ID_INSTANCE")
api_token_instance = os.getenv("GREENAPI_API_TOKEN")
if not id_instance or not api_token_instance:
logger.error(
"Missing required environment variables: GREENAPI_ID_INSTANCE or GREENAPI_API_TOKEN"
)
return False
self.client = GreenApi(
idInstance=id_instance, apiTokenInstance=api_token_instance
)
return True
except Exception as e:
logger.error(f"Failed to initialize WhatsApp client: {e}")
return False
class AuthManager:
"""Manager for authentication-related operations."""
def __init__(self) -> None:
self.session: WhatsAppClient | None = None
async def open_session(self) -> Tuple[bool, str]:
"""Open a new session."""
if self.session:
return False, "Session already exists"
client = WhatsAppClient()
success = await client.initialize()
if success:
self.session = client
return True, "Session created successfully"
return False, "Failed to create session"
def is_authenticated(self) -> bool:
"""Check if a session is authenticated."""
return self.session is not None
def get_client(self) -> WhatsAppClient | None:
"""Get the client for a session."""
return self.session
# Create a singleton instance
auth_manager = AuthManager()