Skip to main content
Glama
discovery_comprehensive_test.go17.6 kB
package discovery import ( "fmt" "os" "path/filepath" "strings" "sync" "sync/atomic" "testing" "time" ) // TestHelper provides utilities for discovery testing with proper cleanup type TestHelper struct { t *testing.T tempDirs []string discoveries []*Discovery mu sync.Mutex } // NewTestHelper creates a new test helper func NewTestHelper(t *testing.T) *TestHelper { return &TestHelper{ t: t, tempDirs: []string{}, discoveries: []*Discovery{}, } } // CreateDiscovery creates a new discovery instance with automatic cleanup func (th *TestHelper) CreateDiscovery() (*Discovery, string) { tempDir := th.t.TempDir() instancesDir := filepath.Join(tempDir, "instances") discovery, err := New(instancesDir) if err != nil { th.t.Fatalf("Failed to create discovery: %v", err) } th.mu.Lock() th.tempDirs = append(th.tempDirs, tempDir) th.discoveries = append(th.discoveries, discovery) th.mu.Unlock() return discovery, instancesDir } // Cleanup stops all discoveries and cleans up temp directories func (th *TestHelper) Cleanup() { th.mu.Lock() defer th.mu.Unlock() for _, d := range th.discoveries { if err := d.Stop(); err != nil { th.t.Logf("Warning: failed to stop discovery: %v", err) } } } // CreateValidInstance creates a valid instance for testing func (th *TestHelper) CreateValidInstance(id string, port int) *Instance { now := time.Now() return &Instance{ ID: id, Name: fmt.Sprintf("Test Instance %s", id), Directory: fmt.Sprintf("/test/dir/%s", id), Port: port, StartedAt: now, LastPing: now, ProcessInfo: struct { PID int `json:"pid"` Executable string `json:"executable"` }{ PID: os.Getpid(), // Use current process PID for valid process Executable: "brum", }, } } // WaitForCondition waits for a condition to be true with timeout func (th *TestHelper) WaitForCondition(timeout time.Duration, checkInterval time.Duration, condition func() bool) bool { deadline := time.Now().Add(timeout) for time.Now().Before(deadline) { if condition() { return true } time.Sleep(checkInterval) } return false } // TestDiscoveryDirectoryCreation verifies discovery creates directories properly func TestDiscoveryDirectoryCreation(t *testing.T) { t.Parallel() // Test 1: Directory doesn't exist - should be created tempDir := t.TempDir() instancesDir := filepath.Join(tempDir, "non-existent", "path", "instances") discovery, err := New(instancesDir) if err != nil { t.Fatalf("Should create directory structure: %v", err) } // Don't stop discovery since we never started it defer discovery.watcher.Close() // Verify directory was created with correct permissions info, err := os.Stat(instancesDir) if err != nil { t.Fatalf("Directory should exist: %v", err) } if !info.IsDir() { t.Error("Expected directory to be created") } // Test 2: Directory with no write permissions (skip on Windows) if os.Getenv("GOOS") != "windows" { readOnlyDir := filepath.Join(tempDir, "readonly") if err := os.MkdirAll(readOnlyDir, 0555); err != nil { t.Fatalf("Failed to create readonly dir: %v", err) } _, err = New(filepath.Join(readOnlyDir, "instances")) if err == nil { t.Error("Should fail with permission denied") } } } // TestInstanceFileValidation tests comprehensive instance validation func TestInstanceFileValidation(t *testing.T) { t.Parallel() th := NewTestHelper(t) defer th.Cleanup() discovery, instancesDir := th.CreateDiscovery() discovery.Start() // Test cases for invalid instances testCases := []struct { name string instance *Instance shouldFail bool errorContains string }{ { name: "Valid instance", instance: th.CreateValidInstance("valid-1", 7777), shouldFail: false, }, { name: "Missing ID", instance: &Instance{ Name: "No ID", Directory: "/test", Port: 7778, StartedAt: time.Now(), LastPing: time.Now(), }, shouldFail: true, errorContains: "missing ID", }, { name: "Invalid port - zero", instance: &Instance{ ID: "bad-port-1", Name: "Bad Port", Directory: "/test", Port: 0, StartedAt: time.Now(), LastPing: time.Now(), }, shouldFail: true, errorContains: "invalid Port", }, { name: "Invalid port - too high", instance: &Instance{ ID: "bad-port-2", Name: "Bad Port", Directory: "/test", Port: 70000, StartedAt: time.Now(), LastPing: time.Now(), }, shouldFail: true, errorContains: "invalid Port", }, { name: "Future timestamp", instance: &Instance{ ID: "future-time", Name: "Future Instance", Directory: "/test", Port: 7779, StartedAt: time.Now().Add(2 * time.Hour), LastPing: time.Now(), }, shouldFail: true, errorContains: "StartedAt is in the future", }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { err := RegisterInstance(instancesDir, tc.instance) if tc.shouldFail { if err == nil { t.Errorf("Expected error for %s", tc.name) } else if tc.errorContains != "" && !strings.Contains(err.Error(), tc.errorContains) { t.Errorf("Expected error containing '%s', got: %v", tc.errorContains, err) } } else { if err != nil { t.Errorf("Unexpected error for %s: %v", tc.name, err) } // Verify instance was registered if !th.WaitForCondition(2*time.Second, 50*time.Millisecond, func() bool { instances := discovery.GetInstances() _, exists := instances[tc.instance.ID] return exists }) { t.Errorf("Instance %s was not discovered", tc.instance.ID) } } }) } } // TestFileWatcherReliability tests that file watcher catches all changes func TestFileWatcherReliability(t *testing.T) { t.Parallel() th := NewTestHelper(t) defer th.Cleanup() discovery, instancesDir := th.CreateDiscovery() // Track all updates var updateMu sync.Mutex updateCount := int32(0) instancesSeen := make(map[string]bool) discovery.OnUpdate(func(instances map[string]*Instance) { updateMu.Lock() defer updateMu.Unlock() atomic.AddInt32(&updateCount, 1) for id := range instances { instancesSeen[id] = true } }) discovery.Start() // Register multiple instances rapidly numInstances := 10 for i := 0; i < numInstances; i++ { instance := th.CreateValidInstance(fmt.Sprintf("rapid-%d", i), 8000+i) if err := RegisterInstance(instancesDir, instance); err != nil { t.Fatalf("Failed to register instance %d: %v", i, err) } } // Wait for all instances to be discovered if !th.WaitForCondition(5*time.Second, 100*time.Millisecond, func() bool { updateMu.Lock() defer updateMu.Unlock() return len(instancesSeen) >= numInstances }) { updateMu.Lock() seen := len(instancesSeen) updateMu.Unlock() t.Errorf("Not all instances discovered: expected %d, got %d", numInstances, seen) } // Verify final state instances := discovery.GetInstances() if len(instances) != numInstances { t.Errorf("Final instance count mismatch: expected %d, got %d", numInstances, len(instances)) } // Test rapid modifications for i := 0; i < 5; i++ { instanceID := fmt.Sprintf("rapid-%d", i) if err := UpdateInstancePing(instancesDir, instanceID); err != nil { t.Errorf("Failed to update ping for %s: %v", instanceID, err) } } // Small delay to ensure updates are processed // This is needed because file system events may be batched time.Sleep(100 * time.Millisecond) // Remove half the instances removedCount := numInstances / 2 for i := 0; i < removedCount; i++ { instanceID := fmt.Sprintf("rapid-%d", i) if err := UnregisterInstance(instancesDir, instanceID); err != nil { t.Errorf("Failed to unregister %s: %v", instanceID, err) } } // Wait for removals to be processed expectedRemaining := numInstances - removedCount if !th.WaitForCondition(3*time.Second, 100*time.Millisecond, func() bool { instances := discovery.GetInstances() return len(instances) == expectedRemaining }) { instances := discovery.GetInstances() t.Errorf("Instance removal not detected: expected %d, got %d", expectedRemaining, len(instances)) } } // TestDiscoveryConcurrentOperations tests thread safety of discovery file operations func TestDiscoveryConcurrentOperations(t *testing.T) { t.Parallel() th := NewTestHelper(t) defer th.Cleanup() discovery, instancesDir := th.CreateDiscovery() discovery.Start() // Concurrent operations configuration numGoroutines := 20 operationsPerGoroutine := 50 var wg sync.WaitGroup errors := make(chan error, numGoroutines*operationsPerGoroutine) // Launch concurrent operations for g := 0; g < numGoroutines; g++ { wg.Add(1) go func(goroutineID int) { defer wg.Done() for op := 0; op < operationsPerGoroutine; op++ { instanceID := fmt.Sprintf("concurrent-%d-%d", goroutineID, op) // Register instance instance := th.CreateValidInstance(instanceID, 9000+goroutineID*100+op) if err := RegisterInstance(instancesDir, instance); err != nil { errors <- fmt.Errorf("register %s: %w", instanceID, err) continue } // Update ping if err := UpdateInstancePing(instancesDir, instanceID); err != nil { errors <- fmt.Errorf("update %s: %w", instanceID, err) } // Read back afo := NewAtomicFileOperations(instancesDir) if _, err := afo.SafeReadInstance(instanceID); err != nil { errors <- fmt.Errorf("read %s: %w", instanceID, err) } // Randomly unregister some instances if op%3 == 0 { if err := UnregisterInstance(instancesDir, instanceID); err != nil { errors <- fmt.Errorf("unregister %s: %w", instanceID, err) } } } }(g) } // Wait for all operations to complete wg.Wait() close(errors) // Check for errors errorCount := 0 for err := range errors { t.Logf("Concurrent operation error: %v", err) errorCount++ } if errorCount > 0 { t.Errorf("Had %d errors during concurrent operations", errorCount) } // Verify final state is consistent instances := discovery.GetInstances() t.Logf("Final instance count: %d", len(instances)) // Verify all remaining instances are valid for id, inst := range instances { if err := validateInstance(inst); err != nil { t.Errorf("Instance %s is invalid after concurrent ops: %v", id, err) } } } // TestStaleInstanceCleanup tests cleanup of dead instances func TestStaleInstanceCleanup(t *testing.T) { t.Parallel() th := NewTestHelper(t) defer th.Cleanup() discovery, instancesDir := th.CreateDiscovery() discovery.Start() // Create instance with stale LastPing staleInstance := th.CreateValidInstance("stale-1", 7777) staleInstance.LastPing = time.Now().Add(-10 * time.Minute) // Very stale staleInstance.ProcessInfo.PID = 99999 // Non-existent process if err := RegisterInstance(instancesDir, staleInstance); err != nil { t.Fatalf("Failed to register stale instance: %v", err) } // Create fresh instance freshInstance := th.CreateValidInstance("fresh-1", 7778) if err := RegisterInstance(instancesDir, freshInstance); err != nil { t.Fatalf("Failed to register fresh instance: %v", err) } // Wait for discovery if !th.WaitForCondition(2*time.Second, 50*time.Millisecond, func() bool { instances := discovery.GetInstances() return len(instances) == 2 }) { instances := discovery.GetInstances() t.Fatalf("Expected 2 instances, got %d", len(instances)) } // Run cleanup if err := discovery.CleanupStaleInstances(); err != nil { t.Fatalf("Cleanup failed: %v", err) } // Verify stale instance was removed finalInstances := discovery.GetInstances() if len(finalInstances) != 1 { t.Errorf("Expected 1 instance after cleanup, got %d", len(finalInstances)) } if _, exists := finalInstances["stale-1"]; exists { t.Error("Stale instance should have been removed") } if _, exists := finalInstances["fresh-1"]; !exists { t.Error("Fresh instance should still exist") } } // TestFileCorruptionHandling tests handling of corrupted instance files func TestFileCorruptionHandling(t *testing.T) { t.Parallel() th := NewTestHelper(t) defer th.Cleanup() discovery, instancesDir := th.CreateDiscovery() // Write corrupted files directly corruptedFiles := []struct { filename string content string }{ {"corrupt-1.json", "not json at all"}, {"corrupt-2.json", `{"id": "missing-fields"}`}, // Missing required fields {"corrupt-3.json", `{]`}, // Invalid JSON {"corrupt-4.json", `{"id":"test","port":"not-a-number"}`}, // Type mismatch } for _, cf := range corruptedFiles { path := filepath.Join(instancesDir, cf.filename) if err := os.WriteFile(path, []byte(cf.content), 0644); err != nil { t.Fatalf("Failed to write corrupted file: %v", err) } } // Write one valid instance validInstance := th.CreateValidInstance("valid-among-corrupt", 7777) if err := RegisterInstance(instancesDir, validInstance); err != nil { t.Fatalf("Failed to register valid instance: %v", err) } // Start discovery - it should handle corrupted files gracefully discovery.Start() // Wait for discovery to process files if !th.WaitForCondition(2*time.Second, 50*time.Millisecond, func() bool { instances := discovery.GetInstances() // Should find exactly one valid instance among the corrupted files return len(instances) == 1 }) { instances := discovery.GetInstances() t.Errorf("Expected 1 valid instance, got %d", len(instances)) } // Verify the valid instance was discovered validInstances := discovery.GetInstances() if _, exists := validInstances["valid-among-corrupt"]; !exists { t.Error("Valid instance should be discovered despite corrupted files") } } // TestCallbackErrorHandling tests that callbacks don't break discovery func TestCallbackErrorHandling(t *testing.T) { t.Skip("Skipping test that intentionally panics - panics cannot be recovered across goroutines") // NOTE: In a real system, callbacks should be wrapped with recover() // to prevent panics from breaking the discovery system. // This test demonstrates that the current implementation does NOT // handle panics in callbacks gracefully. } // TestHubDiscoveryIntegration tests full hub discovery flow func TestHubDiscoveryIntegration(t *testing.T) { t.Parallel() th := NewTestHelper(t) defer th.Cleanup() // Simulate multiple instances registering instancesDir := filepath.Join(t.TempDir(), "hub-instances") // Instance 1: Frontend frontend := &Instance{ ID: "frontend-abc123", Name: "Frontend Dev Server", Directory: "/projects/frontend", Port: 7777, StartedAt: time.Now(), LastPing: time.Now(), ProcessInfo: struct { PID int `json:"pid"` Executable string `json:"executable"` }{ PID: os.Getpid(), Executable: "brum", }, } // Instance 2: Backend backend := &Instance{ ID: "backend-def456", Name: "Backend API Server", Directory: "/projects/backend", Port: 7778, StartedAt: time.Now(), LastPing: time.Now(), ProcessInfo: struct { PID int `json:"pid"` Executable string `json:"executable"` }{ PID: os.Getpid() + 1, // Simulate different process Executable: "brum", }, } // Create discovery system (simulating hub) discovery, err := New(instancesDir) if err != nil { t.Fatalf("Failed to create hub discovery: %v", err) } defer discovery.Stop() // Track discovery events var discoveredMu sync.Mutex discovered := make(map[string]*Instance) removed := make(map[string]bool) discovery.OnUpdate(func(instances map[string]*Instance) { discoveredMu.Lock() defer discoveredMu.Unlock() // Track additions for id, inst := range instances { if _, exists := discovered[id]; !exists { discovered[id] = inst t.Logf("Discovered instance: %s", id) } } // Track removals for id := range discovered { if _, exists := instances[id]; !exists { removed[id] = true t.Logf("Removed instance: %s", id) } } }) discovery.Start() // Register instances (simulating instance startup) if err := RegisterInstance(instancesDir, frontend); err != nil { t.Fatalf("Failed to register frontend: %v", err) } if err := RegisterInstance(instancesDir, backend); err != nil { t.Fatalf("Failed to register backend: %v", err) } // Wait for discovery if !th.WaitForCondition(2*time.Second, 50*time.Millisecond, func() bool { discoveredMu.Lock() defer discoveredMu.Unlock() return len(discovered) == 2 }) { t.Fatalf("Not all instances discovered") } // Simulate frontend updating its ping time.Sleep(100 * time.Millisecond) if err := UpdateInstancePing(instancesDir, "frontend-abc123"); err != nil { t.Errorf("Failed to update frontend ping: %v", err) } // Simulate backend shutdown if err := UnregisterInstance(instancesDir, "backend-def456"); err != nil { t.Errorf("Failed to unregister backend: %v", err) } // Wait for removal to be detected if !th.WaitForCondition(2*time.Second, 50*time.Millisecond, func() bool { discoveredMu.Lock() defer discoveredMu.Unlock() return removed["backend-def456"] }) { t.Error("Backend removal not detected") } // Final state check instances := discovery.GetInstances() if len(instances) != 1 { t.Errorf("Expected 1 instance in final state, got %d", len(instances)) } if _, exists := instances["frontend-abc123"]; !exists { t.Error("Frontend should still be registered") } }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server