Skip to main content
Glama
polling_watcher.go6.05 kB
package discovery import ( "crypto/md5" "encoding/hex" "fmt" "io" "os" "path/filepath" "sync" "time" ) // PollingWatcher provides file watching through periodic polling // This is more reliable than fsnotify for network filesystems type PollingWatcher struct { mu sync.RWMutex interval time.Duration paths map[string]bool fileStates map[string]fileState events chan FileEvent errors chan error stop chan struct{} wg sync.WaitGroup } type fileState struct { modTime time.Time size int64 hash string } type FileEvent struct { Path string Op FileOp } type FileOp int const ( Create FileOp = iota Write Remove ) func (op FileOp) String() string { switch op { case Create: return "CREATE" case Write: return "WRITE" case Remove: return "REMOVE" default: return "UNKNOWN" } } // NewPollingWatcher creates a new polling-based file watcher func NewPollingWatcher(interval time.Duration) *PollingWatcher { if interval < 100*time.Millisecond { interval = 100 * time.Millisecond // Minimum interval } return &PollingWatcher{ interval: interval, paths: make(map[string]bool), fileStates: make(map[string]fileState), events: make(chan FileEvent, 100), errors: make(chan error, 10), stop: make(chan struct{}), } } // Add adds a path to watch (can be file or directory) func (pw *PollingWatcher) Add(path string) error { pw.mu.Lock() defer pw.mu.Unlock() // Verify path exists info, err := os.Stat(path) if err != nil { return fmt.Errorf("stat path: %w", err) } pw.paths[path] = info.IsDir() // If it's a directory, scan existing files if info.IsDir() { if err := pw.scanDirectoryLocked(path); err != nil { return fmt.Errorf("initial scan: %w", err) } } else { // It's a file, record its state state, err := pw.getFileState(path) if err != nil { return fmt.Errorf("get file state: %w", err) } pw.fileStates[path] = state } return nil } // Remove removes a path from watching func (pw *PollingWatcher) Remove(path string) error { pw.mu.Lock() defer pw.mu.Unlock() delete(pw.paths, path) // Remove file states for this path if pw.paths[path] { // It was a directory prefix := path + string(filepath.Separator) for filePath := range pw.fileStates { if filepath.HasPrefix(filePath, prefix) || filePath == path { delete(pw.fileStates, filePath) } } } else { delete(pw.fileStates, path) } return nil } // Start begins watching func (pw *PollingWatcher) Start() { pw.wg.Add(1) go pw.pollLoop() } // Stop stops the watcher and closes channels func (pw *PollingWatcher) Stop() error { close(pw.stop) pw.wg.Wait() close(pw.events) close(pw.errors) return nil } // Events returns the events channel func (pw *PollingWatcher) Events() <-chan FileEvent { return pw.events } // Errors returns the errors channel func (pw *PollingWatcher) Errors() <-chan error { return pw.errors } func (pw *PollingWatcher) pollLoop() { defer pw.wg.Done() ticker := time.NewTicker(pw.interval) defer ticker.Stop() for { select { case <-pw.stop: return case <-ticker.C: pw.poll() } } } func (pw *PollingWatcher) poll() { pw.mu.Lock() defer pw.mu.Unlock() // Track seen files in this poll seenFiles := make(map[string]bool) // Check each watched path for path, isDir := range pw.paths { if isDir { // Scan directory entries, err := os.ReadDir(path) if err != nil { select { case pw.errors <- fmt.Errorf("read dir %s: %w", path, err): default: } continue } for _, entry := range entries { if entry.IsDir() { continue // Skip subdirectories } filePath := filepath.Join(path, entry.Name()) seenFiles[filePath] = true pw.checkFile(filePath) } } else { // Check single file seenFiles[path] = true pw.checkFile(path) } } // Check for removed files for filePath := range pw.fileStates { if !seenFiles[filePath] { // File was removed delete(pw.fileStates, filePath) select { case pw.events <- FileEvent{Path: filePath, Op: Remove}: default: } } } } func (pw *PollingWatcher) checkFile(path string) { newState, err := pw.getFileState(path) if err != nil { if os.IsNotExist(err) { // File was removed if _, exists := pw.fileStates[path]; exists { delete(pw.fileStates, path) select { case pw.events <- FileEvent{Path: path, Op: Remove}: default: } } } return } oldState, exists := pw.fileStates[path] if !exists { // New file pw.fileStates[path] = newState select { case pw.events <- FileEvent{Path: path, Op: Create}: default: } return } // Check for modifications if oldState.modTime != newState.modTime || oldState.size != newState.size || oldState.hash != newState.hash { pw.fileStates[path] = newState select { case pw.events <- FileEvent{Path: path, Op: Write}: default: } } } func (pw *PollingWatcher) getFileState(path string) (fileState, error) { info, err := os.Stat(path) if err != nil { return fileState{}, err } state := fileState{ modTime: info.ModTime(), size: info.Size(), } // For small files, compute hash for better change detection if info.Size() < 1024*1024 { // 1MB hash, err := pw.hashFile(path) if err == nil { state.hash = hash } } return state, nil } func (pw *PollingWatcher) hashFile(path string) (string, error) { file, err := os.Open(path) if err != nil { return "", err } defer file.Close() hash := md5.New() if _, err := io.Copy(hash, file); err != nil { return "", err } return hex.EncodeToString(hash.Sum(nil)), nil } func (pw *PollingWatcher) scanDirectoryLocked(dir string) error { entries, err := os.ReadDir(dir) if err != nil { return err } for _, entry := range entries { if entry.IsDir() { continue } filePath := filepath.Join(dir, entry.Name()) state, err := pw.getFileState(filePath) if err != nil { continue } pw.fileStates[filePath] = state } return nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server