Skip to main content
Glama
store.go20.3 kB
package logs import ( "fmt" "regexp" "sort" "strings" "sync" "time" "github.com/standardbeagle/brummer/pkg/events" "github.com/standardbeagle/brummer/pkg/filters" ) type LogLevel int const ( LevelDebug LogLevel = iota LevelInfo LevelWarn LevelError LevelCritical ) // EventBus interface for publishing events type EventBus interface { Publish(event events.Event) } type LogEntry struct { ID string ProcessID string ProcessName string Timestamp time.Time Content string Level LogLevel IsError bool Tags []string Priority int } // CollapsedLogEntry represents a log entry that may contain multiple identical consecutive logs type CollapsedLogEntry struct { LogEntry Count int // Number of times this exact log appeared consecutively FirstSeen time.Time // Timestamp of the first occurrence LastSeen time.Time // Timestamp of the last occurrence IsCollapsed bool // Whether this entry represents collapsed logs } type Store struct { entries []LogEntry byProcess map[string][]int errors []LogEntry // errorContexts removed - now generated on-demand with functional grouping errorParser *ErrorParser groupingConfig GroupingConfig urls []URLEntry urlMap map[string]*URLEntry // Map URL to its entry for deduplication maxEntries int filters []filters.Filter mu sync.RWMutex // Event bus for publishing LogLine events eventBus EventBus // Channel-based async operations addChan chan *addLogRequest closeChan chan struct{} wg sync.WaitGroup } type URLEntry struct { URL string ProxyURL string // Proxy URL if using reverse proxy mode ProcessID string ProcessName string Timestamp time.Time Context string } type addLogRequest struct { processID string processName string content string isError bool result chan *LogEntry } func NewStore(maxEntries int, eventBus EventBus) *Store { s := &Store{ entries: make([]LogEntry, 0, maxEntries), byProcess: make(map[string][]int), errors: make([]LogEntry, 0, 100), // errorContexts removed - generated on-demand errorParser: NewErrorParser(), groupingConfig: DefaultGroupingConfig(), urls: make([]URLEntry, 0, 100), urlMap: make(map[string]*URLEntry), maxEntries: maxEntries, eventBus: eventBus, filters: []filters.Filter{}, addChan: make(chan *addLogRequest, 1000), closeChan: make(chan struct{}), } // Start async worker s.wg.Add(1) go s.processAddRequests() return s } func (s *Store) Add(processID, processName, content string, isError bool) *LogEntry { // For high-frequency operations, try pure async first req := &addLogRequest{ processID: processID, processName: processName, content: content, isError: isError, result: nil, // No result channel for fire-and-forget } // Non-blocking send to channel select { case s.addChan <- req: // Return a dummy entry for async operation (fire-and-forget) return &LogEntry{ ID: fmt.Sprintf("%s-%d", processID, time.Now().UnixNano()), ProcessID: processID, ProcessName: processName, Timestamp: time.Now(), Content: content, IsError: isError, } default: // Channel full, immediate fallback to sync return s.addSync(processID, processName, content, isError) } } func (s *Store) processAddRequests() { defer s.wg.Done() for { select { case req := <-s.addChan: entry := s.addSync(req.processID, req.processName, req.content, req.isError) if req.result != nil { select { case req.result <- entry: default: } } case <-s.closeChan: // Drain remaining requests for { select { case req := <-s.addChan: entry := s.addSync(req.processID, req.processName, req.content, req.isError) if req.result != nil { select { case req.result <- entry: default: } } default: return } } } } } func (s *Store) addSync(processID, processName, content string, isError bool) *LogEntry { s.mu.Lock() defer s.mu.Unlock() entry := LogEntry{ ID: fmt.Sprintf("%s-%d", processID, time.Now().UnixNano()), ProcessID: processID, ProcessName: processName, Timestamp: time.Now(), Content: content, IsError: isError, Level: s.detectLogLevel(content, isError), Tags: s.extractTags(content), Priority: s.calculatePriority(content, isError), } if len(s.entries) >= s.maxEntries { s.entries = s.entries[1:] for pid, indices := range s.byProcess { for i := range indices { indices[i]-- } s.byProcess[pid] = indices } } idx := len(s.entries) s.entries = append(s.entries, entry) s.byProcess[processID] = append(s.byProcess[processID], idx) // Track errors with enhanced parsing if isError || entry.Level >= LevelError { s.errors = append(s.errors, entry) if len(s.errors) > 100 { s.errors = s.errors[1:] } } // Error grouping is now done on-demand using functional approach // No need to process individual entries here // Detect and track URLs (with deduplication) urls := detectURLs(content) for _, url := range urls { // Check if we already have this URL if existing, exists := s.urlMap[url]; exists { // Update the existing entry with the most recent occurrence existing.Timestamp = entry.Timestamp existing.Context = content existing.ProcessID = processID existing.ProcessName = processName } else { // New URL, add it urlEntry := URLEntry{ URL: url, ProcessID: processID, ProcessName: processName, Timestamp: entry.Timestamp, Context: content, } s.urlMap[url] = &urlEntry // Rebuild the urls slice from the map s.rebuildURLsList() } } // Publish LogLine event if eventBus is available if s.eventBus != nil { s.eventBus.Publish(events.Event{ Type: events.LogLine, ProcessID: processID, Data: map[string]interface{}{ "line": content, "isError": isError, "timestamp": entry.Timestamp, "processName": processName, }, }) } return &entry } func (s *Store) detectLogLevel(content string, isError bool) LogLevel { lower := strings.ToLower(content) if isError || strings.Contains(lower, "error") || strings.Contains(lower, "failed") { return LevelError } if strings.Contains(lower, "critical") || strings.Contains(lower, "fatal") { return LevelCritical } if strings.Contains(lower, "warn") || strings.Contains(lower, "warning") { return LevelWarn } if strings.Contains(lower, "debug") { return LevelDebug } return LevelInfo } func (s *Store) extractTags(content string) []string { tags := []string{} lower := strings.ToLower(content) if strings.Contains(lower, "build") { tags = append(tags, "build") } if strings.Contains(lower, "test") { tags = append(tags, "test") } if strings.Contains(lower, "lint") { tags = append(tags, "lint") } if strings.Contains(lower, "compile") { tags = append(tags, "compile") } if strings.Contains(lower, "warning") { tags = append(tags, "warning") } if strings.Contains(lower, "error") { tags = append(tags, "error") } return tags } func (s *Store) calculatePriority(content string, isError bool) int { priority := 0 if isError { priority += 50 } lower := strings.ToLower(content) if strings.Contains(lower, "failed") { priority += 40 } if strings.Contains(lower, "error") { priority += 30 } if strings.Contains(lower, "warning") { priority += 20 } if strings.Contains(lower, "build") { priority += 10 } if strings.Contains(lower, "test") && (strings.Contains(lower, "fail") || strings.Contains(lower, "pass")) { priority += 15 } for _, filter := range s.filters { if filter.Matches(content) { priority += filter.PriorityBoost } } return priority } func (s *Store) GetByProcess(processID string) []LogEntry { s.mu.RLock() defer s.mu.RUnlock() indices, exists := s.byProcess[processID] if !exists { return []LogEntry{} } result := make([]LogEntry, 0, len(indices)) for _, idx := range indices { if idx >= 0 && idx < len(s.entries) { result = append(result, s.entries[idx]) } } return result } func (s *Store) GetAll() []LogEntry { s.mu.RLock() defer s.mu.RUnlock() result := make([]LogEntry, len(s.entries)) copy(result, s.entries) return result } func (s *Store) Search(query string) []LogEntry { s.mu.RLock() defer s.mu.RUnlock() query = strings.ToLower(query) result := []LogEntry{} for _, entry := range s.entries { if strings.Contains(strings.ToLower(entry.Content), query) || strings.Contains(strings.ToLower(entry.ProcessName), query) { result = append(result, entry) } } return result } func (s *Store) GetHighPriority(threshold int) []LogEntry { s.mu.RLock() defer s.mu.RUnlock() result := []LogEntry{} for _, entry := range s.entries { if entry.Priority >= threshold { result = append(result, entry) } } return result } func (s *Store) AddFilter(filter filters.Filter) { s.mu.Lock() defer s.mu.Unlock() s.filters = append(s.filters, filter) } func (s *Store) RemoveFilter(name string) { s.mu.Lock() defer s.mu.Unlock() newFilters := []filters.Filter{} for _, f := range s.filters { if f.Name != name { newFilters = append(newFilters, f) } } s.filters = newFilters } func (s *Store) GetFilters() []filters.Filter { s.mu.RLock() defer s.mu.RUnlock() result := make([]filters.Filter, len(s.filters)) copy(result, s.filters) return result } var urlRegex = regexp.MustCompile(`https?://[^\s<>"{}|\\^\[\]` + "`" + `]+`) var ansiRegex = regexp.MustCompile(`\x1b\[[0-9;]*m`) // isValidURL checks if a URL is valid and complete func (s *Store) isValidURL(urlStr string) bool { // Must have protocol if !strings.Contains(urlStr, "://") { return false } // Split by protocol parts := strings.SplitN(urlStr, "://", 2) if len(parts) != 2 { return false } // Host part must not be empty hostPart := parts[1] if hostPart == "" { return false } // If there's a colon, it should be followed by a port number or path if idx := strings.Index(hostPart, ":"); idx != -1 { afterColon := hostPart[idx+1:] // Should have something after the colon (port or path) if afterColon == "" || afterColon == "/" { return false } } return true } func detectURLs(content string) []string { // Strip ANSI escape codes before detecting URLs cleanContent := ansiRegex.ReplaceAllString(content, "") matches := urlRegex.FindAllString(cleanContent, -1) validURLs := []string{} for _, url := range matches { // Remove trailing punctuation url = strings.TrimRight(url, ".,;!?)") // Handle trailing colons - these could be incomplete ports or punctuation if strings.HasSuffix(url, ":") { // Count colons after the protocol protocolEnd := strings.Index(url, "://") if protocolEnd >= 0 { afterProtocol := url[protocolEnd+3:] colonCount := strings.Count(afterProtocol, ":") if colonCount == 1 && strings.HasSuffix(afterProtocol, ":") { // This is like "http://localhost:" - incomplete port, skip it continue } else if colonCount > 1 { // Multiple colons like "http://localhost:3000:" - remove trailing url = strings.TrimSuffix(url, ":") } } } // Validate the URL has protocol and host if strings.Contains(url, "://") { parts := strings.SplitN(url, "://", 2) if len(parts) == 2 && len(parts[1]) > 0 { // Additional validation: check if URL ends with bare colon after hostname if strings.HasSuffix(url, ":") && !strings.HasSuffix(url, "://") { // Check if there's anything after the last colon lastColon := strings.LastIndex(url, ":") protocolEnd := strings.Index(url, "://") if lastColon > protocolEnd+2 { // Colon is after the protocol continue // Skip incomplete URLs like http://localhost: } } validURLs = append(validURLs, url) } } } return validURLs } func (s *Store) GetErrors() []LogEntry { s.mu.RLock() defer s.mu.RUnlock() result := make([]LogEntry, len(s.errors)) copy(result, s.errors) return result } func (s *Store) GetURLs() []URLEntry { s.mu.RLock() defer s.mu.RUnlock() result := make([]URLEntry, len(s.urls)) copy(result, s.urls) return result } // Close shuts down the async worker func (s *Store) Close() { // Clean shutdown - no need to finalize clusters since we use functional grouping close(s.closeChan) s.wg.Wait() } func (s *Store) ClearLogs() { s.mu.Lock() defer s.mu.Unlock() s.entries = make([]LogEntry, 0, s.maxEntries) s.byProcess = make(map[string][]int) } func (s *Store) ClearErrors() { s.mu.Lock() defer s.mu.Unlock() s.errors = make([]LogEntry, 0, 100) // errorContexts no longer stored - generated on-demand from entries s.errorParser.ClearErrors() } // ClearLogsForProcess clears all logs for a specific process func (s *Store) ClearLogsForProcess(processName string) { s.mu.Lock() defer s.mu.Unlock() // Create new entries slice without logs from the specified process newEntries := make([]LogEntry, 0, s.maxEntries) for _, entry := range s.entries { if entry.ProcessName != processName { newEntries = append(newEntries, entry) } } s.entries = newEntries // Rebuild the byProcess index s.byProcess = make(map[string][]int) for i, entry := range s.entries { s.byProcess[entry.ProcessID] = append(s.byProcess[entry.ProcessID], i) } // Also clear errors from this process newErrors := make([]LogEntry, 0, 100) for _, err := range s.errors { if err.ProcessName != processName { newErrors = append(newErrors, err) } } s.errors = newErrors // errorContexts no longer stored - they're generated on-demand from entries // No need to clear them separately since they're derived from the filtered entries } // GetErrorContexts returns parsed error contexts with full details func (s *Store) GetErrorContexts() []ErrorContext { // Use functional grouping to generate error contexts on-demand return s.GetErrorContextsFromFunctionalGrouping() } // GetErrorContextsFromFunctionalGrouping generates error contexts using the functional grouping algorithm func (s *Store) GetErrorContextsFromFunctionalGrouping() []ErrorContext { s.mu.RLock() defer s.mu.RUnlock() // Get all log entries for functional grouping allEntries := make([]LogEntry, len(s.entries)) copy(allEntries, s.entries) // Apply functional grouping errorGroups := GroupErrorsByTimeLocality(allEntries, s.groupingConfig) // Convert error groups to error contexts for compatibility var contexts []ErrorContext for _, group := range errorGroups { context := convertErrorGroupToContext(group) contexts = append(contexts, context) } // Limit to recent contexts to prevent memory growth if len(contexts) > 100 { contexts = contexts[len(contexts)-100:] } return contexts } // convertErrorGroupToContext converts an ErrorGroup to ErrorContext for compatibility func convertErrorGroupToContext(group ErrorGroup) ErrorContext { var rawLines []string var stackLines []string var contextLines []string for _, entry := range group.Entries { rawLines = append(rawLines, entry.Content) // Simple heuristics for stack vs context if strings.Contains(entry.Content, " at ") || strings.Contains(entry.Content, ".js:") || strings.Contains(entry.Content, ".ts:") { stackLines = append(stackLines, entry.Content) } else { contextLines = append(contextLines, entry.Content) } } return ErrorContext{ ID: group.ID, ProcessID: group.ProcessID, ProcessName: group.ProcessName, Timestamp: group.StartTime, Type: group.ErrorType, Message: group.Message, Stack: stackLines, Context: contextLines, Severity: group.Severity, Language: "javascript", // Default for now Raw: rawLines, } } // rebuildURLsList rebuilds the urls slice from the urlMap func (s *Store) rebuildURLsList() { s.urls = make([]URLEntry, 0, len(s.urlMap)) for url, urlEntry := range s.urlMap { // Validate URL before including it if s.isValidURL(url) { s.urls = append(s.urls, *urlEntry) } else { // Remove invalid URLs from the map delete(s.urlMap, url) } } // Sort by timestamp (most recent first) using Go's sort package // This is more efficient and less prone to race conditions than bubble sort sort.Slice(s.urls, func(i, j int) bool { return s.urls[i].Timestamp.After(s.urls[j].Timestamp) }) // Keep only the most recent 100 URLs if len(s.urls) > 100 { s.urls = s.urls[:100] // Remove the oldest entries from the map for url := range s.urlMap { found := false for i := 0; i < 100; i++ { if s.urls[i].URL == url { found = true break } } if !found { delete(s.urlMap, url) } } } } // RemoveURLsForProcess removes all URLs associated with a specific process func (s *Store) RemoveURLsForProcess(processID string) { s.mu.Lock() defer s.mu.Unlock() // Remove URLs from the map that belong to this process for url, entry := range s.urlMap { if entry.ProcessID == processID { delete(s.urlMap, url) } } // Rebuild the urls list s.rebuildURLsList() } // UpdateProxyURL updates the proxy URL for a given URL func (s *Store) UpdateProxyURL(originalURL, proxyURL string) { s.mu.Lock() defer s.mu.Unlock() if entry, exists := s.urlMap[originalURL]; exists { entry.ProxyURL = proxyURL // Update the urls list for i := range s.urls { if s.urls[i].URL == originalURL { s.urls[i].ProxyURL = proxyURL break } } } } // DetectURLsInContent detects URLs in the given content without storing them func (s *Store) DetectURLsInContent(content string) []string { return detectURLs(content) } // GetAllCollapsed returns all log entries with consecutive duplicates collapsed func (s *Store) GetAllCollapsed() []CollapsedLogEntry { s.mu.RLock() defer s.mu.RUnlock() return s.collapseConsecutiveDuplicates(s.entries) } // GetByProcessCollapsed returns collapsed log entries for a specific process func (s *Store) GetByProcessCollapsed(processID string) []CollapsedLogEntry { s.mu.RLock() defer s.mu.RUnlock() indices, exists := s.byProcess[processID] if !exists { return []CollapsedLogEntry{} } // Build the log entries for this process entries := make([]LogEntry, 0, len(indices)) for _, idx := range indices { if idx >= 0 && idx < len(s.entries) { entries = append(entries, s.entries[idx]) } } return s.collapseConsecutiveDuplicates(entries) } // collapseConsecutiveDuplicates takes a slice of LogEntry and returns collapsed entries func (s *Store) collapseConsecutiveDuplicates(entries []LogEntry) []CollapsedLogEntry { if len(entries) == 0 { return []CollapsedLogEntry{} } result := make([]CollapsedLogEntry, 0, len(entries)) // Start with the first entry current := CollapsedLogEntry{ LogEntry: entries[0], Count: 1, FirstSeen: entries[0].Timestamp, LastSeen: entries[0].Timestamp, IsCollapsed: false, } for i := 1; i < len(entries); i++ { entry := entries[i] // Check if this entry is identical to the current one (same process and content) if s.areLogsIdentical(current.LogEntry, entry) { // Increment count and update last seen timestamp current.Count++ current.LastSeen = entry.Timestamp current.IsCollapsed = current.Count > 1 } else { // Different log entry, save the current one and start a new one result = append(result, current) current = CollapsedLogEntry{ LogEntry: entry, Count: 1, FirstSeen: entry.Timestamp, LastSeen: entry.Timestamp, IsCollapsed: false, } } } // Add the last entry result = append(result, current) return result } // areLogsIdentical checks if two log entries are identical for collapsing purposes func (s *Store) areLogsIdentical(a, b LogEntry) bool { // Consider logs identical if they have the same process and content // We ignore timestamp and ID since those will naturally be different return a.ProcessID == b.ProcessID && a.ProcessName == b.ProcessName && a.Content == b.Content && a.Level == b.Level && a.IsError == b.IsError }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server