Skip to main content
Glama
atomic_ops.go8.91 kB
package discovery import ( "context" "encoding/json" "fmt" "os" "path/filepath" "time" "github.com/gofrs/flock" ) // AtomicFileOperations provides thread-safe file operations for instance management type AtomicFileOperations struct { instancesDir string lockTimeout time.Duration } // NewAtomicFileOperations creates a new atomic file operations manager func NewAtomicFileOperations(instancesDir string) *AtomicFileOperations { return &AtomicFileOperations{ instancesDir: instancesDir, lockTimeout: 30 * time.Second, // Prevent deadlocks } } // SafeUpdateInstance atomically updates an instance with proper file locking func (afo *AtomicFileOperations) SafeUpdateInstance(instance *Instance) error { return afo.withLock(func() error { return afo.atomicWriteInstance(instance) }) } // SafeUpdateInstancePing atomically updates the last ping time for an instance func (afo *AtomicFileOperations) SafeUpdateInstancePing(instanceID string) error { return afo.withLock(func() error { // Read existing instance data instance, err := afo.readInstanceLocked(instanceID) if err != nil { return fmt.Errorf("failed to read instance: %w", err) } // Update last ping time instance.LastPing = time.Now() // Write back atomically return afo.atomicWriteInstance(instance) }) } // SafeRegisterInstance atomically registers a new instance func (afo *AtomicFileOperations) SafeRegisterInstance(instance *Instance) error { return afo.withLock(func() error { // Validate instance before writing if err := validateInstance(instance); err != nil { return fmt.Errorf("invalid instance: %w", err) } // Ensure instances directory exists with proper permissions if err := os.MkdirAll(afo.instancesDir, DefaultDirMode); err != nil { return fmt.Errorf("failed to create instances directory: %w", err) } return afo.atomicWriteInstance(instance) }) } // SafeUnregisterInstance atomically removes an instance registration func (afo *AtomicFileOperations) SafeUnregisterInstance(instanceID string) error { return afo.withLock(func() error { filename := fmt.Sprintf("%s.json", instanceID) path := filepath.Join(afo.instancesDir, filename) if err := os.Remove(path); err != nil && !os.IsNotExist(err) { return fmt.Errorf("failed to remove instance file: %w", err) } return nil }) } // SafeReadInstance atomically reads an instance with proper locking func (afo *AtomicFileOperations) SafeReadInstance(instanceID string) (*Instance, error) { var instance *Instance var err error lockErr := afo.withLock(func() error { instance, err = afo.readInstanceLocked(instanceID) return err }) if lockErr != nil { return nil, lockErr } return instance, nil } // SafeListInstances atomically lists all instances with proper locking func (afo *AtomicFileOperations) SafeListInstances() (map[string]*Instance, error) { var instances map[string]*Instance var err error lockErr := afo.withLock(func() error { instances, err = afo.listInstancesLocked() return err }) if lockErr != nil { return nil, lockErr } return instances, nil } // withLock executes a function while holding an exclusive file lock func (afo *AtomicFileOperations) withLock(fn func() error) error { // Create lock file path lockFile := filepath.Join(afo.instancesDir, ".discovery.lock") // Ensure the lock directory exists if err := os.MkdirAll(afo.instancesDir, DefaultDirMode); err != nil { return fmt.Errorf("failed to create lock directory: %w", err) } // Create file lock fileLock := flock.New(lockFile) // Set timeout to prevent indefinite blocking ctx, cancel := context.WithTimeout(context.Background(), afo.lockTimeout) defer cancel() // Acquire lock with timeout locked, err := fileLock.TryLockContext(ctx, 100*time.Millisecond) // Retry every 100ms if err != nil { return fmt.Errorf("failed to acquire lock: %w", err) } if !locked { return fmt.Errorf("failed to acquire lock within timeout (%v)", afo.lockTimeout) } // Ensure lock is released defer func() { if unlockErr := fileLock.Unlock(); unlockErr != nil { // Log error but don't override the main error // In production, this should be logged properly fmt.Printf("Warning: failed to release lock: %v\n", unlockErr) } }() // Execute the function while holding the lock return fn() } // atomicWriteInstance writes an instance to disk atomically (must be called within a lock) func (afo *AtomicFileOperations) atomicWriteInstance(instance *Instance) error { // Marshal instance data data, err := json.MarshalIndent(instance, "", " ") if err != nil { return fmt.Errorf("failed to marshal instance data: %w", err) } // Write atomically using temp file + rename filename := fmt.Sprintf("%s.json", instance.ID) finalPath := filepath.Join(afo.instancesDir, filename) // Use enhanced atomic write with proper error handling if err := afo.atomicWriteFile(finalPath, data, DefaultFileMode); err != nil { return fmt.Errorf("failed to write instance file: %w", err) } return nil } // readInstanceLocked reads an instance from disk (must be called within a lock) func (afo *AtomicFileOperations) readInstanceLocked(instanceID string) (*Instance, error) { filename := fmt.Sprintf("%s.json", instanceID) path := filepath.Join(afo.instancesDir, filename) data, err := os.ReadFile(path) if err != nil { return nil, fmt.Errorf("failed to read instance file: %w", err) } var instance Instance if err := json.Unmarshal(data, &instance); err != nil { return nil, fmt.Errorf("failed to unmarshal instance data: %w", err) } // Validate instance data if err := validateInstance(&instance); err != nil { return nil, fmt.Errorf("invalid instance data: %w", err) } return &instance, nil } // listInstancesLocked lists all instances from disk (must be called within a lock) func (afo *AtomicFileOperations) listInstancesLocked() (map[string]*Instance, error) { instances := make(map[string]*Instance) // Check if directory exists if _, err := os.Stat(afo.instancesDir); os.IsNotExist(err) { return instances, nil // Return empty map if directory doesn't exist } // Read directory entries entries, err := os.ReadDir(afo.instancesDir) if err != nil { return nil, fmt.Errorf("failed to read instances directory: %w", err) } // Process each .json file for _, entry := range entries { if entry.IsDir() || !entry.Type().IsRegular() { continue } name := entry.Name() if filepath.Ext(name) != ".json" || name == ".discovery.lock" { continue } // Extract instance ID from filename instanceID := name[:len(name)-5] // Remove .json extension // Read instance instance, err := afo.readInstanceLocked(instanceID) if err != nil { // Log error but continue processing other instances fmt.Printf("Warning: failed to read instance %s: %v\n", instanceID, err) continue } instances[instanceID] = instance } return instances, nil } // atomicWriteFile writes data to a file atomically using temp file + rename func (afo *AtomicFileOperations) atomicWriteFile(path string, data []byte, perm os.FileMode) error { // Create directory if needed dir := filepath.Dir(path) if err := os.MkdirAll(dir, DefaultDirMode); err != nil { return fmt.Errorf("failed to create directory: %w", err) } // Create temp file in same directory (for atomic rename) tempFile, err := os.CreateTemp(dir, ".tmp-instance-") if err != nil { return fmt.Errorf("failed to create temp file: %w", err) } tempPath := tempFile.Name() // Clean up temp file on error defer func() { if tempFile != nil { tempFile.Close() os.Remove(tempPath) } }() // Write data if _, err := tempFile.Write(data); err != nil { return fmt.Errorf("failed to write data: %w", err) } // Sync to disk to ensure data is written if err := tempFile.Sync(); err != nil { return fmt.Errorf("failed to sync file: %w", err) } // Close before rename if err := tempFile.Close(); err != nil { return fmt.Errorf("failed to close temp file: %w", err) } tempFile = nil // Prevent defer cleanup // Set permissions if err := os.Chmod(tempPath, perm); err != nil { return fmt.Errorf("failed to set permissions: %w", err) } // Atomic rename (this is the atomic operation) if err := os.Rename(tempPath, path); err != nil { return fmt.Errorf("failed to rename file: %w", err) } return nil } // GetLockTimeout returns the current lock timeout func (afo *AtomicFileOperations) GetLockTimeout() time.Duration { return afo.lockTimeout } // SetLockTimeout sets the lock timeout (useful for testing) func (afo *AtomicFileOperations) SetLockTimeout(timeout time.Duration) { afo.lockTimeout = timeout } // IsInstanceFileCorrupted checks if an instance file is corrupted func (afo *AtomicFileOperations) IsInstanceFileCorrupted(instanceID string) bool { _, err := afo.SafeReadInstance(instanceID) return err != nil }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server