WhatsApp MCP Server
by msaelices
Verified
"""Group module for WhatsApp MCP Server."""
import asyncio
import json
import logging
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
from whatsapp_mcp.models import Contact, Group, Participant
from whatsapp_mcp.modules.auth import auth_manager
logger = logging.getLogger(__name__)
async def create_group(group_name: str, participants: List[str]) -> Group:
"""Create a new WhatsApp group."""
logger.info(f"Creating group {group_name} with {len(participants)} participants")
whatsapp_client = auth_manager.get_client()
if not whatsapp_client:
raise ValueError("Session not found")
if not whatsapp_client.client:
raise ValueError("WhatsApp client not initialized")
if len(participants) < 1:
raise ValueError("Need at least one participant to create a group")
try:
# Format participant phone numbers correctly
formatted_participants = []
for phone in participants:
# Add @c.us suffix if not already present
if not phone.endswith("@c.us"):
formatted_phone = f"{phone}@c.us"
else:
formatted_phone = phone
formatted_participants.append(formatted_phone)
# Prepare the request data for group creation
# Note: The exact API format may vary depending on the WhatsApp API being used
group_data = {"group_name": group_name, "participants": formatted_participants}
logger.debug(f"Creating group with data: {json.dumps(group_data)}")
# Create the group via the WhatsApp API
# The response format may vary depending on the API
response = await asyncio.to_thread(
whatsapp_client.client.create_group, group_data
)
# Parse the response
if not response or not response.get("success", False):
error_msg = (
response.get("error", "Unknown error") if response else "No response"
)
logger.error(f"Failed to create group: {error_msg}")
raise ValueError(f"Failed to create group: {error_msg}")
# Extract group information from response
group_info = response.get("group", {})
group_id = group_info.get("id", f"{uuid.uuid4().hex[:12]}@g.us")
# Create participant objects
participant_objects = []
for i, phone in enumerate(participants):
contact = Contact(
id=formatted_participants[i],
name=f"Participant {i + 1}", # We may not have names initially
phone=phone,
)
participant = Participant(id=contact.id, is_admin=False, contact=contact)
participant_objects.append(participant)
# Create the group object
group = Group(
id=group_id,
name=group_name,
description=group_info.get("description", ""),
owner=group_info.get("owner", "me"),
creation_time=datetime.now().isoformat(),
participants=participant_objects,
)
logger.info(f"Group created with ID {group_id}")
return group
except Exception as e:
logger.error(f"Failed to create group: {e}")
raise ValueError(f"Failed to create group: {str(e)}")
async def get_group_participants(group_id: str) -> List[Participant]:
"""Get the participants of a WhatsApp group."""
logger.info(f"Getting participants for group {group_id}")
whatsapp_client = auth_manager.get_client()
if not whatsapp_client:
raise ValueError("Session not found")
if not whatsapp_client.client:
raise ValueError("WhatsApp client not initialized")
# Validate group ID format
if not group_id.endswith("@g.us"):
raise ValueError("Invalid group ID format. Must end with @g.us")
try:
# Prepare the request data for fetching group participants
request_data = {"group_id": group_id}
logger.debug(f"Fetching participants for group: {group_id}")
# Get the participants via the WhatsApp API
response = await asyncio.to_thread(
whatsapp_client.client.get_group_participants, request_data
)
# Parse the response
if not response or not response.get("success", False):
error_msg = (
response.get("error", "Unknown error") if response else "No response"
)
logger.error(f"Failed to get group participants: {error_msg}")
raise ValueError(f"Failed to get group participants: {error_msg}")
# Extract participants information from response
participants_info = response.get("participants", [])
# Create participant objects
participants = []
for p_info in participants_info:
p_id = p_info.get("id", "")
p_name = p_info.get("name", "Unknown")
p_phone = p_info.get("phone", p_id.replace("@c.us", ""))
p_is_admin = p_info.get("is_admin", False)
contact = Contact(id=p_id, name=p_name, phone=p_phone)
participant = Participant(id=p_id, is_admin=p_is_admin, contact=contact)
participants.append(participant)
return participants
except Exception as e:
logger.error(f"Failed to get group participants: {e}")
raise ValueError(f"Failed to get group participants: {str(e)}")
async def add_participant(group_id: str, participant_phone: str) -> Dict[str, Any]:
"""Add a participant to a WhatsApp group."""
logger.info(f"Adding participant {participant_phone} to group {group_id}")
whatsapp_client = auth_manager.get_client()
if not whatsapp_client:
raise ValueError("Session not found")
if not whatsapp_client.client:
raise ValueError("WhatsApp client not initialized")
# Validate group ID format
if not group_id.endswith("@g.us"):
raise ValueError("Invalid group ID format. Must end with @g.us")
try:
# Format participant phone number correctly
if not participant_phone.endswith("@c.us"):
formatted_phone = f"{participant_phone}@c.us"
else:
formatted_phone = participant_phone
# Prepare the request data
request_data = {"group_id": group_id, "participant": formatted_phone}
logger.debug(f"Adding participant with data: {json.dumps(request_data)}")
# Add the participant via the WhatsApp API
response = await asyncio.to_thread(
whatsapp_client.client.add_group_participant, request_data
)
# Parse the response
if not response or not response.get("success", False):
error_msg = (
response.get("error", "Unknown error") if response else "No response"
)
logger.error(f"Failed to add participant: {error_msg}")
raise ValueError(f"Failed to add participant: {error_msg}")
return {
"success": True,
"group_id": group_id,
"participant": formatted_phone,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Failed to add participant: {e}")
raise ValueError(f"Failed to add participant: {str(e)}")
async def remove_participant(group_id: str, participant_id: str) -> Dict[str, Any]:
"""Remove a participant from a WhatsApp group."""
logger.info(f"Removing participant {participant_id} from group {group_id}")
whatsapp_client = auth_manager.get_client()
if not whatsapp_client:
raise ValueError("Session not found")
if not whatsapp_client.client:
raise ValueError("WhatsApp client not initialized")
# Validate group ID format
if not group_id.endswith("@g.us"):
raise ValueError("Invalid group ID format. Must end with @g.us")
# Validate participant ID format
if not participant_id.endswith("@c.us"):
raise ValueError("Invalid participant ID format. Must end with @c.us")
try:
# Prepare the request data
request_data = {"group_id": group_id, "participant": participant_id}
logger.debug(f"Removing participant with data: {json.dumps(request_data)}")
# Remove the participant via the WhatsApp API
response = await asyncio.to_thread(
whatsapp_client.client.remove_group_participant, request_data
)
# Parse the response
if not response or not response.get("success", False):
error_msg = (
response.get("error", "Unknown error") if response else "No response"
)
logger.error(f"Failed to remove participant: {error_msg}")
raise ValueError(f"Failed to remove participant: {error_msg}")
return {
"success": True,
"group_id": group_id,
"participant": participant_id,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Failed to remove participant: {e}")
raise ValueError(f"Failed to remove participant: {str(e)}")
async def update_group_settings(
group_id: str,
name: Optional[str] = None,
description: Optional[str] = None,
) -> Dict[str, Any]:
"""Update the settings of a WhatsApp group."""
logger.info(f"Updating settings for group {group_id}")
whatsapp_client = auth_manager.get_client()
if not whatsapp_client:
raise ValueError("Session not found")
if not whatsapp_client.client:
raise ValueError("WhatsApp client not initialized")
# Validate group ID format
if not group_id.endswith("@g.us"):
raise ValueError("Invalid group ID format. Must end with @g.us")
# Ensure at least one setting is being updated
if name is None and description is None:
raise ValueError("Must provide at least one setting to update")
try:
# Prepare the request data
request_data = {"group_id": group_id}
if name is not None:
request_data["name"] = name
if description is not None:
request_data["description"] = description
logger.debug(f"Updating group settings with data: {json.dumps(request_data)}")
# Update the group settings via the WhatsApp API
response = await asyncio.to_thread(
whatsapp_client.client.update_group_settings, request_data
)
# Parse the response
if not response or not response.get("success", False):
error_msg = (
response.get("error", "Unknown error") if response else "No response"
)
logger.error(f"Failed to update group settings: {error_msg}")
raise ValueError(f"Failed to update group settings: {error_msg}")
updated_fields = []
if name is not None:
updated_fields.append("name")
if description is not None:
updated_fields.append("description")
return {
"success": True,
"group_id": group_id,
"updated_fields": updated_fields,
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
logger.error(f"Failed to update group settings: {e}")
raise ValueError(f"Failed to update group settings: {str(e)}")