Skip to main content
Glama
instance.go10.7 kB
// Package discovery provides file-based instance discovery for brummer. // It watches a directory for JSON files containing instance metadata and // maintains an in-memory registry of running instances. // // The discovery system is thread-safe and supports concurrent access. // Callbacks are notified of changes but must not call back into the // discovery system to avoid deadlocks. package discovery import ( "fmt" "os" "path/filepath" "sync" "time" "github.com/fsnotify/fsnotify" ) const ( // DefaultFileMode is the default permission for instance files (owner read/write only) DefaultFileMode = 0600 // DefaultDirMode is the default permission for the instances directory DefaultDirMode = 0700 // StaleInstanceTimeout is the duration after which an instance is considered stale StaleInstanceTimeout = 5 * time.Minute ) // Instance represents a running brummer instance. // All fields are required unless otherwise noted. type Instance struct { ID string `json:"id"` Name string `json:"name"` Directory string `json:"directory"` Port int `json:"port"` StartedAt time.Time `json:"started_at"` LastPing time.Time `json:"last_ping"` ProcessInfo struct { PID int `json:"pid"` Executable string `json:"executable"` } `json:"process_info"` } // Discovery manages instance discovery via file watching. // It is thread-safe and can be safely accessed concurrently. type Discovery struct { mu sync.RWMutex instances map[string]*Instance instancesDir string watcher *fsnotify.Watcher updateCallbacks []func(instances map[string]*Instance) stopCh chan struct{} stoppedCh chan struct{} atomicOps *AtomicFileOperations } // New creates a new instance discovery system. // The instancesDir will be created if it doesn't exist. func New(instancesDir string) (*Discovery, error) { // Create instances directory if it doesn't exist if err := os.MkdirAll(instancesDir, DefaultDirMode); err != nil { return nil, fmt.Errorf("failed to create instances directory: %w", err) } watcher, err := fsnotify.NewWatcher() if err != nil { return nil, fmt.Errorf("failed to create file watcher: %w", err) } // Ensure watcher is cleaned up on any error var watcherClosed bool defer func() { if !watcherClosed && err != nil { watcher.Close() } }() d := &Discovery{ instances: make(map[string]*Instance), instancesDir: instancesDir, watcher: watcher, stopCh: make(chan struct{}), stoppedCh: make(chan struct{}), atomicOps: NewAtomicFileOperations(instancesDir), } // Add the instances directory to the watcher if err = watcher.Add(instancesDir); err != nil { return nil, fmt.Errorf("failed to watch instances directory: %w", err) } // Initial scan of existing files if err = d.scanDirectory(); err != nil { return nil, fmt.Errorf("initial directory scan failed: %w", err) } watcherClosed = true // Prevent deferred cleanup return d, nil } // Start begins watching for instance changes. // This method returns immediately and watches in the background. func (d *Discovery) Start() { go d.watch() } // Stop stops the discovery system and waits for the watch goroutine to exit. func (d *Discovery) Stop() error { // Signal stop close(d.stopCh) // Close watcher to trigger event loop exit err := d.watcher.Close() // Wait for watch goroutine to complete <-d.stoppedCh return err } // GetInstances returns a copy of the current instances func (d *Discovery) GetInstances() map[string]*Instance { d.mu.RLock() defer d.mu.RUnlock() // Return a copy to prevent concurrent modification result := make(map[string]*Instance) for k, v := range d.instances { result[k] = v } return result } // OnUpdate registers a callback for instance updates func (d *Discovery) OnUpdate(callback func(instances map[string]*Instance)) { d.mu.Lock() defer d.mu.Unlock() d.updateCallbacks = append(d.updateCallbacks, callback) } // scanDirectory scans the instances directory for existing instance files func (d *Discovery) scanDirectory() error { // Use atomic operations to safely list all instances instances, err := d.atomicOps.SafeListInstances() if err != nil { return fmt.Errorf("failed to list instances: %w", err) } // Update in-memory cache with discovered instances d.mu.Lock() for id, instance := range instances { d.instances[id] = instance } d.mu.Unlock() return nil } // watch monitors the instances directory for changes func (d *Discovery) watch() { defer close(d.stoppedCh) for { select { case <-d.stopCh: return case event, ok := <-d.watcher.Events: if !ok { return } if !isInstanceFile(filepath.Base(event.Name)) { continue } switch { case event.Has(fsnotify.Write) || event.Has(fsnotify.Create): if err := d.loadInstance(event.Name); err != nil { // TODO: Add proper logging fmt.Fprintf(os.Stderr, "Failed to load instance %s: %v\n", event.Name, err) } case event.Has(fsnotify.Remove): d.removeInstance(event.Name) } case err, ok := <-d.watcher.Errors: if !ok { return } // TODO: Add proper logging fmt.Fprintf(os.Stderr, "Watcher error: %v\n", err) } } } // loadInstance loads an instance from a file func (d *Discovery) loadInstance(path string) error { // Extract instance ID from the filename filename := filepath.Base(path) instanceID := extractInstanceID(filename) if instanceID == "" { return fmt.Errorf("invalid instance filename: %s", filename) } // Use atomic operations to safely read the instance instance, err := d.atomicOps.SafeReadInstance(instanceID) if err != nil { return fmt.Errorf("failed to read instance %s: %w", instanceID, err) } // Get callbacks while holding lock to ensure consistency d.mu.Lock() d.instances[instance.ID] = instance // Create a copy of callbacks to avoid holding lock during callback execution callbacks := make([]func(map[string]*Instance), len(d.updateCallbacks)) copy(callbacks, d.updateCallbacks) // Get instances snapshot while still holding the lock instancesCopy := d.getInstancesLocked() d.mu.Unlock() // Notify callbacks without holding lock for _, callback := range callbacks { callback(instancesCopy) } return nil } // validateInstance validates that an instance has all required fields func validateInstance(inst *Instance) error { if inst.ID == "" { return fmt.Errorf("missing ID") } if inst.Name == "" { return fmt.Errorf("missing Name") } if inst.Directory == "" { return fmt.Errorf("missing Directory") } if inst.Port <= 0 || inst.Port > 65535 { return fmt.Errorf("invalid Port: %d", inst.Port) } if inst.StartedAt.IsZero() { return fmt.Errorf("missing StartedAt") } if inst.LastPing.IsZero() { return fmt.Errorf("missing LastPing") } // Check if timestamps are reasonable (not in the future) now := time.Now() if inst.StartedAt.After(now.Add(time.Minute)) { return fmt.Errorf("StartedAt is in the future") } if inst.LastPing.After(now.Add(time.Minute)) { return fmt.Errorf("LastPing is in the future") } return nil } // getInstancesLocked returns a copy of instances while holding the read lock // Caller must hold at least a read lock func (d *Discovery) getInstancesLocked() map[string]*Instance { result := make(map[string]*Instance, len(d.instances)) for k, v := range d.instances { // Create a copy of the instance to prevent external modification instCopy := *v result[k] = &instCopy } return result } // removeInstance removes an instance when its file is deleted func (d *Discovery) removeInstance(path string) { // Extract instance ID from filename filename := filepath.Base(path) instanceID := extractInstanceID(filename) if instanceID == "" { return } d.mu.Lock() // Check if instance exists if _, exists := d.instances[instanceID]; !exists { d.mu.Unlock() return } delete(d.instances, instanceID) // Create a copy of callbacks to avoid holding lock during callback execution callbacks := make([]func(map[string]*Instance), len(d.updateCallbacks)) copy(callbacks, d.updateCallbacks) // Get instances snapshot while still holding the lock instancesCopy := d.getInstancesLocked() d.mu.Unlock() // Notify callbacks without holding lock for _, callback := range callbacks { callback(instancesCopy) } } // isInstanceFile checks if a filename is an instance file func isInstanceFile(name string) bool { return filepath.Ext(name) == ".json" && len(name) > 5 } // extractInstanceID extracts the instance ID from a filename func extractInstanceID(filename string) string { if !isInstanceFile(filename) { return "" } return filename[:len(filename)-5] // Remove .json extension } // CleanupStaleInstances removes instances that haven't been updated recently. // This should be called periodically to clean up after crashed instances. func (d *Discovery) CleanupStaleInstances() error { d.mu.Lock() defer d.mu.Unlock() now := time.Now() staleIDs := []string{} // Find stale instances for id, inst := range d.instances { isStale := now.Sub(inst.LastPing) > StaleInstanceTimeout // Also check if the process is actually running if isStale || !d.isProcessRunning(inst.ProcessInfo.PID) { staleIDs = append(staleIDs, id) } } // Remove stale instances using atomic operations for _, id := range staleIDs { // Remove from memory delete(d.instances, id) // Remove file atomically if err := d.atomicOps.SafeUnregisterInstance(id); err != nil { // Log error but continue cleanup fmt.Fprintf(os.Stderr, "Failed to remove stale instance file %s: %v\n", id, err) } } // Notify callbacks if any instances were removed if len(staleIDs) > 0 { callbacks := make([]func(map[string]*Instance), len(d.updateCallbacks)) copy(callbacks, d.updateCallbacks) instancesCopy := d.getInstancesLocked() // Release lock before callbacks d.mu.Unlock() for _, callback := range callbacks { callback(instancesCopy) } d.mu.Lock() // Re-acquire for defer } return nil } // isProcessRunning checks if a process with the given PID is currently running func (d *Discovery) isProcessRunning(pid int) bool { if pid <= 0 { return false } // Use Linux /proc filesystem approach which is more reliable procPath := fmt.Sprintf("/proc/%d", pid) if _, err := os.Stat(procPath); os.IsNotExist(err) { return false } // Check if the process is actually running by reading its status statusPath := filepath.Join(procPath, "stat") if _, err := os.Stat(statusPath); os.IsNotExist(err) { return false } // If /proc/PID/stat exists, the process is running return true }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server