Skip to main content
Glama
02-instance-discovery.md13.5 kB
# Implementation Step 2: Instance Discovery ## Overview This step implements the file-based instance discovery system. Instances write JSON files to signal their availability, and the hub watches these files to initiate connections. The files are signals only - the actual connection state determines availability. ## Goals 1. Implement file watcher for instance discovery 2. Ensure instances only register AFTER MCP server is listening 3. Use OS-specific configuration directories 4. Make file operations non-blocking with channels 5. Connect discovery to hub's instances/list tool ## Technical Design ### Discovery Flow ``` Instance Startup: 1. Start MCP server (HTTP) 2. Call net.Listen() 3. Get actual port from listener 4. Write instance file with port 5. File appears in discovery directory Hub Discovery: 1. Watch discovery directory 2. Detect new instance file 3. Parse instance metadata 4. Initiate connection (step 3) 5. Add to available instances ``` ### File Structure ```json // ~/.local/share/brummer/instances/{instance-id}.json { "id": "uuid-v4", "pid": 12345, "path": "/home/user/project", "port": 7778, "started": "2024-01-01T00:00:00Z", "name": "my-project", "has_package_json": true, "last_seen": "2024-01-01T00:00:00Z" } ``` ### Directory Structure ``` ~/.local/share/brummer/ ├── instances/ │ ├── {instance-id-1}.json │ ├── {instance-id-2}.json │ └── ... └── temp/ └── {instance-id}.json.tmp # For atomic writes ``` ## Implementation ### 1. Update Instance Registration ```go // internal/mcp/server.go func StartInstanceServer(ctx context.Context, port int, projectPath string) error { // Create MCP server first server := NewServer(/* ... */) // CRITICAL: Listen first, register after listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { return fmt.Errorf("failed to listen: %w", err) } defer listener.Close() // Get actual port (might be different if port was 0) actualPort := listener.Addr().(*net.TCPAddr).Port // Now register with discovery registry := discovery.NewRegistry() projectName := getProjectName(projectPath) hasPackageJSON := fileExists(filepath.Join(projectPath, "package.json")) if err := registry.Register(projectPath, actualPort, projectName, hasPackageJSON); err != nil { // Log but don't fail - instance can run without discovery log.Printf("Failed to register with discovery: %v", err) } // Ensure cleanup on shutdown defer func() { if err := registry.Unregister(); err != nil { log.Printf("Failed to unregister: %v", err) } }() // Start serving return server.Serve(listener) } ``` ### 2. Instance Watcher Implementation ```go // internal/mcp/instance_watcher.go package mcp import ( "context" "encoding/json" "io/ioutil" "path/filepath" "time" "github.com/fsnotify/fsnotify" "github.com/standardbeagle/brummer/internal/discovery" ) type InstanceWatcher struct { instancesChan chan<- *discovery.Instance errorsChan chan<- error watcher *fsnotify.Watcher } func NewInstanceWatcher(instancesChan chan<- *discovery.Instance, errorsChan chan<- error) (*InstanceWatcher, error) { watcher, err := fsnotify.NewWatcher() if err != nil { return nil, err } return &InstanceWatcher{ instancesChan: instancesChan, errorsChan: errorsChan, watcher: watcher, }, nil } func (iw *InstanceWatcher) Start(ctx context.Context) error { instancesDir, err := discovery.GetInstancesDir() if err != nil { return err } // Watch the instances directory if err := iw.watcher.Add(instancesDir); err != nil { return err } // Initial scan of existing files if err := iw.scanDirectory(instancesDir); err != nil { return err } // Watch for changes go iw.watchLoop(ctx) return nil } func (iw *InstanceWatcher) watchLoop(ctx context.Context) { for { select { case <-ctx.Done(): iw.watcher.Close() return case event, ok := <-iw.watcher.Events: if !ok { return } // Only care about JSON files if !strings.HasSuffix(event.Name, ".json") { continue } switch { case event.Op&fsnotify.Create == fsnotify.Create: iw.handleNewFile(event.Name) case event.Op&fsnotify.Write == fsnotify.Write: iw.handleUpdatedFile(event.Name) case event.Op&fsnotify.Remove == fsnotify.Remove: iw.handleRemovedFile(event.Name) } case err, ok := <-iw.watcher.Errors: if !ok { return } select { case iw.errorsChan <- err: case <-ctx.Done(): return } } } } func (iw *InstanceWatcher) handleNewFile(path string) { // Small delay to ensure file is fully written time.Sleep(50 * time.Millisecond) instance, err := iw.readInstanceFile(path) if err != nil { select { case iw.errorsChan <- err: default: } return } // Send to discovery channel select { case iw.instancesChan <- instance: default: // Channel full, skip } } func (iw *InstanceWatcher) readInstanceFile(path string) (*discovery.Instance, error) { data, err := ioutil.ReadFile(path) if err != nil { return nil, err } var instance discovery.Instance if err := json.Unmarshal(data, &instance); err != nil { return nil, err } return &instance, nil } func (iw *InstanceWatcher) scanDirectory(dir string) error { entries, err := ioutil.ReadDir(dir) if err != nil { return err } for _, entry := range entries { if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".json") { continue } path := filepath.Join(dir, entry.Name()) instance, err := iw.readInstanceFile(path) if err != nil { continue // Skip bad files } select { case iw.instancesChan <- instance: default: // Channel full, skip } } return nil } ``` ### 3. Update Hub Server ```go // internal/mcp/hub_server.go type HubServer struct { connMgr *ConnectionManager watcher *InstanceWatcher instancesChan chan *discovery.Instance errorsChan chan error } func NewHubServer() (*HubServer, error) { instancesChan := make(chan *discovery.Instance, 100) errorsChan := make(chan error, 100) watcher, err := NewInstanceWatcher(instancesChan, errorsChan) if err != nil { return nil, err } return &HubServer{ connMgr: NewConnectionManager(), watcher: watcher, instancesChan: instancesChan, errorsChan: errorsChan, }, nil } func (h *HubServer) Start(ctx context.Context) error { // Start instance watcher if err := h.watcher.Start(ctx); err != nil { return err } // Process discovered instances go h.processDiscoveredInstances(ctx) return nil } func (h *HubServer) processDiscoveredInstances(ctx context.Context) { for { select { case <-ctx.Done(): return case instance := <-h.instancesChan: // For now, just track it // In step 3, we'll establish connections info := &ConnectionInfo{ InstanceID: instance.ID, Name: instance.Name, Path: instance.Path, Port: instance.Port, PID: instance.PID, HasPackageJSON: instance.HasPackageJSON, State: StateListening, ConnectedAt: time.Now(), LastActivity: time.Now(), } if err := h.connMgr.Register(info); err != nil { log.Printf("Failed to register instance %s: %v", instance.ID, err) } case err := <-h.errorsChan: log.Printf("Discovery error: %v", err) } } } func (h *HubServer) CallTool(ctx context.Context, request *mcp.CallToolRequest) (*mcp.CallToolResult, error) { switch request.Name { case "instances/list": // Get instances from connection manager instances := h.connMgr.List() // Convert to JSON data, err := json.Marshal(instances) if err != nil { return nil, err } return &mcp.CallToolResult{ Content: []mcp.Content{{ Type: "text", Text: string(data), }}, }, nil // ... other tools ... } } ``` ### 4. Atomic File Writing ```go // internal/discovery/registry.go func (r *Registry) writeInstance(instance *Instance) error { data, err := json.MarshalIndent(instance, "", " ") if err != nil { return err } instancesDir, err := GetInstancesDir() if err != nil { return err } tempDir, err := GetTempDir() if err != nil { return err } // Write to temp file first tempFile := filepath.Join(tempDir, instance.ID+".json.tmp") if err := ioutil.WriteFile(tempFile, data, 0644); err != nil { return err } // Atomic rename to final location finalFile := filepath.Join(instancesDir, instance.ID+".json") if err := os.Rename(tempFile, finalFile); err != nil { os.Remove(tempFile) // Clean up on failure return err } return nil } ``` ## Testing Plan ### 1. Unit Tests ```go // internal/mcp/instance_watcher_test.go func TestInstanceWatcher(t *testing.T) { // Create temp directory tempDir := t.TempDir() // Create watcher instancesChan := make(chan *discovery.Instance, 10) errorsChan := make(chan error, 10) watcher, err := NewInstanceWatcher(instancesChan, errorsChan) require.NoError(t, err) // Start watching ctx, cancel := context.WithCancel(context.Background()) defer cancel() err = watcher.Start(ctx) require.NoError(t, err) // Create instance file instance := &discovery.Instance{ ID: "test-123", Port: 7778, Name: "test-project", } data, _ := json.Marshal(instance) err = ioutil.WriteFile(filepath.Join(tempDir, "test-123.json"), data, 0644) require.NoError(t, err) // Should receive instance select { case received := <-instancesChan: assert.Equal(t, "test-123", received.ID) case <-time.After(time.Second): t.Fatal("timeout waiting for instance") } } ``` ### 2. Integration Test ```bash #!/bin/bash # test_discovery.sh # Start hub brum --mcp & HUB_PID=$! sleep 0.5 # Start an instance cd test-project brum --no-tui & INSTANCE_PID=$! sleep 1 # Query hub for instances echo '{"jsonrpc":"2.0","id":1,"method":"tools/call","params":{"name":"instances/list"}}' | \ brum --mcp | grep -q "test-project" # Cleanup kill $INSTANCE_PID kill $HUB_PID ``` ### 3. File Watch Test ```go func TestFileWatchEvents(t *testing.T) { // Test create, update, delete events // Verify each triggers appropriate action // Check atomic writes work correctly } ``` ## Success Criteria 1. ✅ Instance files created AFTER server is listening 2. ✅ Files use OS-specific config directories 3. ✅ Atomic file writes (temp → rename) 4. ✅ File watcher detects new instances < 100ms 5. ✅ instances/list returns discovered instances 6. ✅ No blocking file operations 7. ✅ Graceful handling of invalid files 8. ✅ Cleanup on instance shutdown ## Edge Cases ### 1. Directory Doesn't Exist - Create directory structure on first use - Handle permission errors gracefully ### 2. Invalid JSON Files - Skip and log error - Don't crash the watcher ### 3. Stale Instance Files - Will be handled by connection manager in step 3 - For now, include all files in list ### 4. Race Conditions - Instance writes file before port is ready - Solution: Write file AFTER net.Listen() ### 5. File System Full - Log error and continue without discovery - Instance runs standalone ## Security Considerations 1. **File Permissions** - Instance files: 0644 (readable by user) - Directories: 0755 - No sensitive data in files 2. **Path Validation** - Sanitize instance IDs (no path traversal) - Use filepath.Clean on all paths 3. **Resource Limits** - Limit number of watched files - Bounded channels for events ## Next Steps With discovery working: 1. Step 3: Implement connection management 2. Step 4: Add tool proxying 3. Step 5: Health monitoring ## Code Checklist - [ ] Update instance server to register after listening - [ ] Create instance watcher with fsnotify - [ ] Implement atomic file writes - [ ] Add discovery to hub server - [ ] Update instances/list to return real data - [ ] Add comprehensive tests - [ ] Handle all edge cases - [ ] Document security considerations

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server