Skip to main content
Glama
init.go30.7 kB
package whatsapp import ( "context" "encoding/json" "fmt" "os" "path/filepath" "strings" "sync/atomic" "time" "go.mau.fi/whatsmeow/proto/waHistorySync" "github.com/aldinokemal/go-whatsapp-web-multidevice/config" domainChatStorage "github.com/aldinokemal/go-whatsapp-web-multidevice/domains/chatstorage" pkgError "github.com/aldinokemal/go-whatsapp-web-multidevice/pkg/error" "github.com/aldinokemal/go-whatsapp-web-multidevice/pkg/utils" "github.com/aldinokemal/go-whatsapp-web-multidevice/ui/websocket" "github.com/sirupsen/logrus" "go.mau.fi/whatsmeow" "go.mau.fi/whatsmeow/appstate" "go.mau.fi/whatsmeow/proto/waE2E" "go.mau.fi/whatsmeow/store" "go.mau.fi/whatsmeow/store/sqlstore" "go.mau.fi/whatsmeow/types" "go.mau.fi/whatsmeow/types/events" waLog "go.mau.fi/whatsmeow/util/log" "google.golang.org/protobuf/proto" ) // Type definitions type ExtractedMedia struct { MediaPath string `json:"media_path"` MimeType string `json:"mime_type"` Caption string `json:"caption"` } // Global variables var ( cli *whatsmeow.Client db *sqlstore.Container // Add global database reference for cleanup keysDB *sqlstore.Container log waLog.Logger historySyncID int32 startupTime = time.Now().Unix() ) // InitWaDB initializes the WhatsApp database connection func InitWaDB(ctx context.Context, DBURI string) *sqlstore.Container { log = waLog.Stdout("Main", config.WhatsappLogLevel, true) dbLog := waLog.Stdout("Database", config.WhatsappLogLevel, true) storeContainer, err := initDatabase(ctx, dbLog, DBURI) if err != nil { log.Errorf("Database initialization error: %v", err) panic(pkgError.InternalServerError(fmt.Sprintf("Database initialization error: %v", err))) } return storeContainer } // initDatabase creates and returns a database store container based on the configured URI func initDatabase(ctx context.Context, dbLog waLog.Logger, DBURI string) (*sqlstore.Container, error) { if strings.HasPrefix(DBURI, "file:") { return sqlstore.New(ctx, "sqlite3", DBURI, dbLog) } else if strings.HasPrefix(DBURI, "postgres:") { return sqlstore.New(ctx, "postgres", DBURI, dbLog) } return nil, fmt.Errorf("unknown database type: %s. Currently only sqlite3(file:) and postgres are supported", DBURI) } func syncKeysDevice(ctx context.Context, db, keysDB *sqlstore.Container) { if keysDB == nil { return } dev, err := db.GetFirstDevice(ctx) if err != nil { log.Errorf("Failed to get all devices: %v", err) } else { found := false if devs, err := keysDB.GetAllDevices(ctx); err != nil { log.Errorf("Failed to get all devices: %v", err) } else { for _, d := range devs { if d.ID == dev.ID { found = true break } else { keysDB.DeleteDevice(ctx, d) } } if !found { keysDB.PutDevice(ctx, dev) } } } } // InitWaCLI initializes the WhatsApp client func InitWaCLI(ctx context.Context, storeContainer, keysStoreContainer *sqlstore.Container, chatStorageRepo domainChatStorage.IChatStorageRepository) *whatsmeow.Client { device, err := storeContainer.GetFirstDevice(ctx) if err != nil { log.Errorf("Failed to get device: %v", err) panic(err) } if device == nil { log.Errorf("No device found") panic("No device found") } // Configure device properties osName := fmt.Sprintf("%s %s", config.AppOs, config.AppVersion) store.DeviceProps.PlatformType = &config.AppPlatform store.DeviceProps.Os = &osName // Set global database reference for remote logout cleanup db = storeContainer keysDB = keysStoreContainer // Configure a separated database for accelerating encryption caching if keysDB != nil && device.ID != nil { innerStore := sqlstore.NewSQLStore(keysStoreContainer, *device.ID) syncKeysDevice(ctx, db, keysDB) device.Identities = innerStore device.Sessions = innerStore device.PreKeys = innerStore device.SenderKeys = innerStore device.MsgSecrets = innerStore device.PrivacyTokens = innerStore } // Create and configure the client cli = whatsmeow.NewClient(device, waLog.Stdout("Client", config.WhatsappLogLevel, true)) cli.EnableAutoReconnect = true cli.AutoTrustIdentity = true cli.AddEventHandler(func(rawEvt interface{}) { handler(ctx, rawEvt, chatStorageRepo) }) return cli } // UpdateGlobalClient updates the global cli variable with a new client instance // This is needed when reinitializing the client after logout to ensure all // infrastructure code uses the new client instance func UpdateGlobalClient(newCli *whatsmeow.Client, newDB *sqlstore.Container) { cli = newCli db = newDB log.Infof("Global WhatsApp client updated successfully") } // GetClient returns the current global client instance (alias for GetGlobalClient) func GetClient() *whatsmeow.Client { return cli } // Get DB instance func GetDB() *sqlstore.Container { return db } // GetConnectionStatus returns the current connection status of the global client func GetConnectionStatus() (isConnected bool, isLoggedIn bool, deviceID string) { if cli == nil { return false, false, "" } isConnected = cli.IsConnected() isLoggedIn = cli.IsLoggedIn() if cli.Store != nil && cli.Store.ID != nil { deviceID = cli.Store.ID.String() } return isConnected, isLoggedIn, deviceID } // CleanupDatabase removes the database file to prevent foreign key constraint issues func CleanupDatabase() error { dbPath := strings.TrimPrefix(config.DBURI, "file:") if strings.Contains(dbPath, "?") { dbPath = strings.Split(dbPath, "?")[0] } logrus.Infof("[CLEANUP] Removing database file: %s", dbPath) if err := os.Remove(dbPath); err != nil { if !os.IsNotExist(err) { logrus.Errorf("[CLEANUP] Error removing database file: %v", err) return err } else { logrus.Info("[CLEANUP] Database file already removed") } } else { logrus.Info("[CLEANUP] Database file removed successfully") } return nil } // CleanupTemporaryFiles removes history files, QR images, and send items func CleanupTemporaryFiles() error { // Clean up history files if files, err := filepath.Glob(fmt.Sprintf("./%s/history-*", config.PathStorages)); err == nil { for _, f := range files { if err := os.Remove(f); err != nil { logrus.Errorf("[CLEANUP] Error removing history file %s: %v", f, err) return err } } logrus.Info("[CLEANUP] History files cleaned up") } // Clean up QR images if qrImages, err := filepath.Glob(fmt.Sprintf("./%s/scan-*", config.PathQrCode)); err == nil { for _, f := range qrImages { if err := os.Remove(f); err != nil { logrus.Errorf("[CLEANUP] Error removing QR image %s: %v", f, err) return err } } logrus.Info("[CLEANUP] QR images cleaned up") } // Clean up send items if qrItems, err := filepath.Glob(fmt.Sprintf("./%s/*", config.PathSendItems)); err == nil { for _, f := range qrItems { if !strings.Contains(f, ".gitignore") { if err := os.Remove(f); err != nil { logrus.Errorf("[CLEANUP] Error removing send item %s: %v", f, err) return err } } } logrus.Info("[CLEANUP] Send items cleaned up") } return nil } // ReinitializeWhatsAppComponents reinitializes database and client components func ReinitializeWhatsAppComponents(ctx context.Context, chatStorageRepo domainChatStorage.IChatStorageRepository) (*sqlstore.Container, *whatsmeow.Client, error) { logrus.Info("[CLEANUP] Reinitializing database and client...") newDB := InitWaDB(ctx, config.DBURI) if config.DBKeysURI != "" { keysDB = InitWaDB(ctx, config.DBKeysURI) } newCli := InitWaCLI(ctx, newDB, keysDB, chatStorageRepo) // Update global references db = newDB cli = newCli logrus.Info("[CLEANUP] Database and client reinitialized successfully") return newDB, newCli, nil } // PerformCompleteCleanup performs all cleanup operations in the correct order func PerformCompleteCleanup(ctx context.Context, logPrefix string, chatStorageRepo domainChatStorage.IChatStorageRepository) (*sqlstore.Container, *whatsmeow.Client, error) { logrus.Infof("[%s] Starting complete cleanup process...", logPrefix) // Disconnect current client if it exists if cli != nil { cli.Disconnect() logrus.Infof("[%s] Client disconnected", logPrefix) } // Truncate all chatstorage data before other cleanup if chatStorageRepo != nil { logrus.Infof("[%s] Truncating chatstorage data...", logPrefix) if err := chatStorageRepo.TruncateAllDataWithLogging(logPrefix); err != nil { logrus.Errorf("[%s] Failed to truncate chatstorage data: %v", logPrefix, err) // Continue with cleanup even if chatstorage truncation fails } } // Clean up database if err := CleanupDatabase(); err != nil { return nil, nil, fmt.Errorf("database cleanup failed: %v", err) } // Reinitialize components newDB, newCli, err := ReinitializeWhatsAppComponents(ctx, chatStorageRepo) if err != nil { return nil, nil, fmt.Errorf("reinitialization failed: %v", err) } // Clean up temporary files if err := CleanupTemporaryFiles(); err != nil { logrus.Errorf("[%s] Temporary file cleanup failed (non-critical): %v", logPrefix, err) // Don't return error for file cleanup as it's non-critical } logrus.Infof("[%s] Complete cleanup process finished successfully", logPrefix) logrus.Infof("[%s] Application is ready for next login without restart", logPrefix) return newDB, newCli, nil } // PerformCleanupAndUpdateGlobals is a convenience function that performs cleanup // and ensures global client synchronization func PerformCleanupAndUpdateGlobals(ctx context.Context, logPrefix string, chatStorageRepo domainChatStorage.IChatStorageRepository) (*sqlstore.Container, *whatsmeow.Client, error) { newDB, newCli, err := PerformCompleteCleanup(ctx, logPrefix, chatStorageRepo) if err != nil { return nil, nil, err } // Ensure global client is properly synchronized UpdateGlobalClient(newCli, newDB) return newDB, newCli, nil } // handleRemoteLogout performs cleanup when user logs out from their phone func handleRemoteLogout(ctx context.Context, chatStorageRepo domainChatStorage.IChatStorageRepository) { logrus.Info("[REMOTE_LOGOUT] User logged out from phone - starting cleanup...") logrus.Info("[REMOTE_LOGOUT] This will clear all WhatsApp session data and chat storage") // Log database state before cleanup if db != nil { devices, dbErr := db.GetAllDevices(ctx) if dbErr != nil { logrus.Errorf("[REMOTE_LOGOUT] Error getting devices before cleanup: %v", dbErr) } else { logrus.Infof("[REMOTE_LOGOUT] Devices before cleanup: %d found", len(devices)) } } // Perform complete cleanup with global client synchronization _, _, err := PerformCleanupAndUpdateGlobals(ctx, "REMOTE_LOGOUT", chatStorageRepo) if err != nil { logrus.Errorf("[REMOTE_LOGOUT] Cleanup failed: %v", err) return } logrus.Info("[REMOTE_LOGOUT] Remote logout cleanup completed successfully") } // handler is the main event handler for WhatsApp events func handler(ctx context.Context, rawEvt any, chatStorageRepo domainChatStorage.IChatStorageRepository) { switch evt := rawEvt.(type) { case *events.DeleteForMe: handleDeleteForMe(ctx, evt, chatStorageRepo) case *events.AppStateSyncComplete: handleAppStateSyncComplete(ctx, evt) case *events.PairSuccess: handlePairSuccess(ctx, evt) case *events.LoggedOut: handleLoggedOut(ctx, chatStorageRepo) case *events.Connected, *events.PushNameSetting: handleConnectionEvents(ctx) case *events.StreamReplaced: handleStreamReplaced(ctx) case *events.Message: handleMessage(ctx, evt, chatStorageRepo) case *events.Receipt: handleReceipt(ctx, evt) case *events.Presence: handlePresence(ctx, evt) case *events.HistorySync: handleHistorySync(ctx, evt, chatStorageRepo) case *events.AppState: handleAppState(ctx, evt) case *events.GroupInfo: handleGroupInfo(ctx, evt) } } // Event handler functions func handleDeleteForMe(ctx context.Context, evt *events.DeleteForMe, chatStorageRepo domainChatStorage.IChatStorageRepository) { log.Infof("Deleted message %s for %s", evt.MessageID, evt.SenderJID.String()) // Find the message to get its chat JID message, err := chatStorageRepo.GetMessageByID(evt.MessageID) if err != nil { log.Errorf("Failed to find message %s for deletion: %v", evt.MessageID, err) return } if message == nil { log.Warnf("Message %s not found in database, skipping deletion", evt.MessageID) return } // Delete the message from database if err := chatStorageRepo.DeleteMessage(evt.MessageID, message.ChatJID); err != nil { log.Errorf("Failed to delete message %s from database: %v", evt.MessageID, err) } else { log.Infof("Successfully deleted message %s from database", evt.MessageID) } // Send webhook notification for delete event if len(config.WhatsappWebhook) > 0 { go func() { if err := forwardDeleteToWebhook(ctx, evt, message); err != nil { log.Errorf("Failed to forward delete event to webhook: %v", err) } }() } } func handleAppStateSyncComplete(_ context.Context, evt *events.AppStateSyncComplete) { if len(cli.Store.PushName) > 0 && evt.Name == appstate.WAPatchCriticalBlock { if err := cli.SendPresence(types.PresenceAvailable); err != nil { log.Warnf("Failed to send available presence: %v", err) } else { log.Infof("Marked self as available") } } } func handlePairSuccess(ctx context.Context, evt *events.PairSuccess) { websocket.Broadcast <- websocket.BroadcastMessage{ Code: "LOGIN_SUCCESS", Message: fmt.Sprintf("Successfully pair with %s", evt.ID.String()), } syncKeysDevice(ctx, db, keysDB) } func handleLoggedOut(ctx context.Context, chatStorageRepo domainChatStorage.IChatStorageRepository) { logrus.Warn("[REMOTE_LOGOUT] Received LoggedOut event - user logged out from phone") // Perform comprehensive cleanup handleRemoteLogout(ctx, chatStorageRepo) // Broadcast final notification that cleanup is complete and ready for new login websocket.Broadcast <- websocket.BroadcastMessage{ Code: "LOGOUT_COMPLETE", Message: "Remote logout cleanup completed - ready for new login", Result: nil, } } func handleConnectionEvents(_ context.Context) { if len(cli.Store.PushName) == 0 { return } // Send presence available when connecting and when the pushname is changed. // This makes sure that outgoing messages always have the right pushname. if err := cli.SendPresence(types.PresenceAvailable); err != nil { log.Warnf("Failed to send available presence: %v", err) } else { log.Infof("Marked self as available") } } func handleStreamReplaced(_ context.Context) { os.Exit(0) } func handleMessage(ctx context.Context, evt *events.Message, chatStorageRepo domainChatStorage.IChatStorageRepository) { // Log message metadata metaParts := buildMessageMetaParts(evt) log.Infof("Received message %s from %s (%s): %+v", evt.Info.ID, evt.Info.SourceString(), strings.Join(metaParts, ", "), evt.Message, ) if err := StoreMessage(ctx, evt); err != nil { // Log storage errors to avoid silent failures that could lead to data loss log.Errorf("Failed to store incoming message %s: %v", evt.Info.ID, err) } // Handle image message if present handleImageMessage(ctx, evt) // Auto-mark message as read if configured handleAutoMarkRead(ctx, evt) // Handle auto-reply if configured handleAutoReply(ctx, evt, chatStorageRepo) // Forward to webhook if configured handleWebhookForward(ctx, evt) } func buildMessageMetaParts(evt *events.Message) []string { metaParts := []string{ fmt.Sprintf("pushname: %s", evt.Info.PushName), fmt.Sprintf("timestamp: %s", evt.Info.Timestamp), } if evt.Info.Type != "" { metaParts = append(metaParts, fmt.Sprintf("type: %s", evt.Info.Type)) } if evt.Info.Category != "" { metaParts = append(metaParts, fmt.Sprintf("category: %s", evt.Info.Category)) } if evt.IsViewOnce { metaParts = append(metaParts, "view once") } return metaParts } func handleImageMessage(ctx context.Context, evt *events.Message) { if img := evt.Message.GetImageMessage(); img != nil { if path, err := utils.ExtractMedia(ctx, cli, config.PathStorages, img); err != nil { log.Errorf("Failed to download image: %v", err) } else { log.Infof("Image downloaded to %s", path) } } } func handleAutoMarkRead(_ context.Context, evt *events.Message) { // Only mark read if auto-mark read is enabled and message is incoming if !config.WhatsappAutoMarkRead || evt.Info.IsFromMe { return } // Mark the message as read messageIDs := []types.MessageID{evt.Info.ID} timestamp := time.Now() chat := evt.Info.Chat sender := evt.Info.Sender if err := cli.MarkRead(messageIDs, timestamp, chat, sender); err != nil { log.Warnf("Failed to mark message %s as read: %v", evt.Info.ID, err) } else { log.Debugf("Marked message %s as read", evt.Info.ID) } } func handleAutoReply(ctx context.Context, evt *events.Message, chatStorageRepo domainChatStorage.IChatStorageRepository) { if config.WhatsappAutoReplyMessage == "" { return } // Skip groups, broadcasts, and self messages if utils.IsGroupJID(evt.Info.Chat.String()) || evt.Info.IsIncomingBroadcast() || evt.Info.IsFromMe { return } // Only reply to direct 1:1 chats (e.g., *@s.whatsapp.net) if evt.Info.Chat.Server != types.DefaultUserServer { return } // Extra safety: skip any broadcast/status contexts source := evt.Info.SourceString() if strings.Contains(source, "broadcast") || strings.HasSuffix(evt.Info.Chat.String(), "@broadcast") || strings.HasPrefix(evt.Info.Chat.String(), "status@") { return } // Require actual typed text (not captions or synthetic labels) hasText := false // Unwrap FutureProof wrappers to access the inner message content first innerMsg := evt.Message for i := 0; i < 3; i++ { // safeguard against excessively nested wrappers if vm := innerMsg.GetViewOnceMessage(); vm != nil && vm.GetMessage() != nil { innerMsg = vm.GetMessage() continue } if em := innerMsg.GetEphemeralMessage(); em != nil && em.GetMessage() != nil { innerMsg = em.GetMessage() continue } if vm2 := innerMsg.GetViewOnceMessageV2(); vm2 != nil && vm2.GetMessage() != nil { innerMsg = vm2.GetMessage() continue } if vm2e := innerMsg.GetViewOnceMessageV2Extension(); vm2e != nil && vm2e.GetMessage() != nil { innerMsg = vm2e.GetMessage() continue } break } // Check for genuine typed text on the unwrapped content if conv := innerMsg.GetConversation(); conv != "" { hasText = true } else if ext := innerMsg.GetExtendedTextMessage(); ext != nil && ext.GetText() != "" { hasText = true } else if protoMsg := innerMsg.GetProtocolMessage(); protoMsg != nil { if edited := protoMsg.GetEditedMessage(); edited != nil { if ext := edited.GetExtendedTextMessage(); ext != nil && ext.GetText() != "" { hasText = true } else if conv := edited.GetConversation(); conv != "" { hasText = true } } } if !hasText { return } // Format recipient JID recipientJID := utils.FormatJID(evt.Info.Sender.String()) // Send the auto-reply message response, err := cli.SendMessage( ctx, recipientJID, &waE2E.Message{Conversation: proto.String(config.WhatsappAutoReplyMessage)}, ) if err != nil { log.Errorf("Failed to send auto-reply message: %v", err) return } // Store the auto-reply message in chat storage if send was successful if chatStorageRepo != nil { // Get our own JID as sender senderJID := "" if cli.Store.ID != nil { senderJID = cli.Store.ID.String() } // Store the sent auto-reply message if err := chatStorageRepo.StoreSentMessageWithContext( ctx, response.ID, // Message ID from WhatsApp response senderJID, // Our JID as sender recipientJID.String(), // Recipient JID config.WhatsappAutoReplyMessage, // Auto-reply content response.Timestamp, // Timestamp from response ); err != nil { // Log storage error but don't fail the auto-reply log.Errorf("Failed to store auto-reply message in chat storage: %v", err) } else { log.Debugf("Auto-reply message %s stored successfully in chat storage", response.ID) } } } func handleWebhookForward(ctx context.Context, evt *events.Message) { // Skip webhook for specific protocol messages that shouldn't trigger webhooks if protocolMessage := evt.Message.GetProtocolMessage(); protocolMessage != nil { protocolType := protocolMessage.GetType().String() // Skip EPHEMERAL_SYNC_RESPONSE but allow REVOKE and MESSAGE_EDIT if protocolType == "EPHEMERAL_SYNC_RESPONSE" { log.Debugf("Skipping webhook for EPHEMERAL_SYNC_RESPONSE message") return } } if len(config.WhatsappWebhook) > 0 && !strings.Contains(evt.Info.SourceString(), "broadcast") { go func(evt *events.Message) { if err := forwardMessageToWebhook(ctx, evt); err != nil { logrus.Error("Failed forward to webhook: ", err) } }(evt) } } func handleReceipt(ctx context.Context, evt *events.Receipt) { sendReceipt := false switch evt.Type { case types.ReceiptTypeRead, types.ReceiptTypeReadSelf: sendReceipt = true log.Infof("%v was read by %s at %s: %+v", evt.MessageIDs, evt.SourceString(), evt.Timestamp, evt) case types.ReceiptTypeDelivered: sendReceipt = true log.Infof("%s was delivered to %s at %s: %+v", evt.MessageIDs[0], evt.SourceString(), evt.Timestamp, evt) } // Forward receipt (ack) event to webhook if configured // Note: Receipt events are not rate limited as they are critical for message delivery status if len(config.WhatsappWebhook) > 0 && sendReceipt { go func(e *events.Receipt) { if err := forwardReceiptToWebhook(ctx, e); err != nil { logrus.Errorf("Failed to forward ack event to webhook: %v", err) } }(evt) } } func handlePresence(_ context.Context, evt *events.Presence) { if evt.Unavailable { if evt.LastSeen.IsZero() { log.Infof("%s is now offline", evt.From) } else { log.Infof("%s is now offline (last seen: %s)", evt.From, evt.LastSeen) } } else { log.Infof("%s is now online", evt.From) } } func handleHistorySync(ctx context.Context, evt *events.HistorySync, chatStorageRepo domainChatStorage.IChatStorageRepository) { id := atomic.AddInt32(&historySyncID, 1) fileName := fmt.Sprintf("%s/history-%d-%s-%d-%s.json", config.PathStorages, startupTime, cli.Store.ID.String(), id, evt.Data.SyncType.String(), ) file, err := os.OpenFile(fileName, os.O_WRONLY|os.O_CREATE, 0600) if err != nil { log.Errorf("Failed to open file to write history sync: %v", err) return } defer file.Close() enc := json.NewEncoder(file) enc.SetIndent("", " ") if err = enc.Encode(evt.Data); err != nil { log.Errorf("Failed to write history sync: %v", err) return } log.Infof("Wrote history sync to %s", fileName) // Process history sync data to database if chatStorageRepo != nil { if err := processHistorySync(ctx, evt.Data, chatStorageRepo); err != nil { log.Errorf("Failed to process history sync to database: %v", err) } } } func handleAppState(_ context.Context, evt *events.AppState) { log.Debugf("App state event: %+v / %+v", evt.Index, evt.SyncActionValue) } // processHistorySync processes history sync data and stores messages in the database func processHistorySync(ctx context.Context, data *waHistorySync.HistorySync, chatStorageRepo domainChatStorage.IChatStorageRepository) error { if data == nil { return nil } syncType := data.GetSyncType() log.Infof("Processing history sync type: %s", syncType.String()) switch syncType { case waHistorySync.HistorySync_INITIAL_BOOTSTRAP, waHistorySync.HistorySync_RECENT: // Process conversation messages return processConversationMessages(ctx, data, chatStorageRepo) case waHistorySync.HistorySync_PUSH_NAME: // Process push names to update chat names return processPushNames(ctx, data, chatStorageRepo) default: // Other sync types are not needed for message storage log.Debugf("Skipping history sync type: %s", syncType.String()) return nil } } // processConversationMessages processes and stores conversation messages from history sync func processConversationMessages(_ context.Context, data *waHistorySync.HistorySync, chatStorageRepo domainChatStorage.IChatStorageRepository) error { conversations := data.GetConversations() log.Infof("Processing %d conversations from history sync", len(conversations)) for _, conv := range conversations { chatJID := conv.GetID() if chatJID == "" { continue } // Parse JID to get proper format jid, err := types.ParseJID(chatJID) if err != nil { log.Warnf("Failed to parse JID %s: %v", chatJID, err) continue } displayName := conv.GetDisplayName() // Get or create chat chatName := chatStorageRepo.GetChatNameWithPushName(jid, chatJID, "", displayName) // Extract ephemeral expiration from conversation ephemeralExpiration := conv.GetEphemeralExpiration() // Process messages in the conversation messages := conv.GetMessages() log.Debugf("Processing %d messages for chat %s", len(messages), chatJID) // Collect messages for batch processing var messageBatch []*domainChatStorage.Message var latestTimestamp time.Time for _, histMsg := range messages { if histMsg == nil || histMsg.Message == nil { continue } msg := histMsg.Message msgKey := msg.GetKey() if msgKey == nil { continue } // Skip messages without ID messageID := msgKey.GetID() if messageID == "" { continue } // Extract message content and media info content := utils.ExtractMessageTextFromProto(msg.GetMessage()) mediaType, filename, url, mediaKey, fileSHA256, fileEncSHA256, fileLength := utils.ExtractMediaInfo(msg.GetMessage()) // Skip if there's no content and no media if content == "" && mediaType == "" { continue } // Determine sender sender := "" isFromMe := msgKey.GetFromMe() if isFromMe { // For self-messages, use the full JID format to match regular message processing if cli.Store.ID != nil { sender = cli.Store.ID.String() // Use full JID instead of just User part } else { // Skip messages where we can't determine the sender to avoid NOT NULL violations log.Warnf("Skipping self-message %s: client ID unavailable", messageID) continue } } else { participant := msgKey.GetParticipant() if participant != "" { // For group messages, participant contains the actual sender if senderJID, err := types.ParseJID(participant); err == nil { sender = senderJID.String() // Use full JID format for consistency } else { // Fallback to participant string, but ensure it's not empty if participant != "" { sender = participant } else { log.Warnf("Skipping message %s: empty participant", messageID) continue } } } else { // For individual chats, use the chat JID as sender with full format sender = jid.String() // Use full JID format for consistency } } // Convert timestamp from Unix seconds to time.Time // WhatsApp history sync timestamps are in seconds, not milliseconds timestamp := time.Unix(int64(msg.GetMessageTimestamp()), 0) // Track latest timestamp if timestamp.After(latestTimestamp) { latestTimestamp = timestamp } // Create message object and add to batch message := &domainChatStorage.Message{ ID: messageID, ChatJID: chatJID, Sender: sender, Content: content, Timestamp: timestamp, IsFromMe: isFromMe, MediaType: mediaType, Filename: filename, URL: url, MediaKey: mediaKey, FileSHA256: fileSHA256, FileEncSHA256: fileEncSHA256, FileLength: fileLength, } messageBatch = append(messageBatch, message) } // Store or update the chat with latest message time if len(messageBatch) > 0 { chat := &domainChatStorage.Chat{ JID: chatJID, Name: chatName, LastMessageTime: latestTimestamp, EphemeralExpiration: ephemeralExpiration, } // Store or update the chat if err := chatStorageRepo.StoreChat(chat); err != nil { log.Warnf("Failed to store chat %s: %v", chatJID, err) continue } // Store messages in batch if err := chatStorageRepo.StoreMessagesBatch(messageBatch); err != nil { log.Warnf("Failed to store messages batch for chat %s: %v", chatJID, err) } else { log.Debugf("Stored %d messages for chat %s", len(messageBatch), chatJID) } } } return nil } // processPushNames processes push names from history sync to update chat names func processPushNames(_ context.Context, data *waHistorySync.HistorySync, chatStorageRepo domainChatStorage.IChatStorageRepository) error { pushnames := data.GetPushnames() log.Infof("Processing %d push names from history sync", len(pushnames)) for _, pushname := range pushnames { jidStr := pushname.GetID() name := pushname.GetPushname() if jidStr == "" || name == "" { continue } // Check if chat exists existingChat, err := chatStorageRepo.GetChat(jidStr) if err != nil || existingChat == nil { // Chat doesn't exist yet, skip continue } // Update chat name if it's different if existingChat.Name != name { existingChat.Name = name if err := chatStorageRepo.StoreChat(existingChat); err != nil { log.Warnf("Failed to update chat name for %s: %v", jidStr, err) } else { log.Debugf("Updated chat name for %s to %s", jidStr, name) } } } return nil } func handleGroupInfo(ctx context.Context, evt *events.GroupInfo) { // Only process events that have actual changes hasChanges := len(evt.Join) > 0 || len(evt.Leave) > 0 || len(evt.Promote) > 0 || len(evt.Demote) > 0 || evt.Name != nil || evt.Topic != nil || evt.Locked != nil || evt.Announce != nil if !hasChanges { return } // Log group events for debugging if len(evt.Join) > 0 { log.Infof("Group %s: %d users joined at %s", evt.JID, len(evt.Join), evt.Timestamp) } if len(evt.Leave) > 0 { log.Infof("Group %s: %d users left at %s", evt.JID, len(evt.Leave), evt.Timestamp) } if len(evt.Promote) > 0 { log.Infof("Group %s: %d users promoted at %s", evt.JID, len(evt.Promote), evt.Timestamp) } if len(evt.Demote) > 0 { log.Infof("Group %s: %d users demoted at %s", evt.JID, len(evt.Demote), evt.Timestamp) } // Forward group info event to webhook if configured if len(config.WhatsappWebhook) > 0 { go func(e *events.GroupInfo) { if err := forwardGroupInfoToWebhook(ctx, e); err != nil { logrus.Errorf("Failed to forward group info event to webhook: %v", err) } }(evt) } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/samihalawa/whatsapp-go-mcp'

If you have feedback or need assistance with the MCP directory API, please join our Discord server