Skip to main content
Glama
server.go49 kB
package proxy import ( "bytes" "compress/gzip" _ "embed" "encoding/base64" "encoding/json" "fmt" "io" "log" "net/http" "net/http/httputil" "net/url" "strings" "sync" "sync/atomic" "time" "github.com/elazarl/goproxy" "github.com/gorilla/websocket" "github.com/standardbeagle/brummer/pkg/events" "github.com/standardbeagle/brummer/pkg/ports" ) // Request represents a proxied HTTP request with its metadata type Request struct { ID string Method string URL string Host string Path string StatusCode int StartTime time.Time Duration time.Duration Size int64 Error string ProcessName string // Telemetry data SessionID string HasTelemetry bool Telemetry *PageSession // Link to telemetry session if available // Authentication data HasAuth bool // True if request has Authorization header AuthType string // Type of auth (Bearer, Basic, etc.) JWTClaims map[string]interface{} // Decoded JWT claims if present JWTError string // JWT decoding error if any // Error tracking IsError bool // True if status code is 4xx or 5xx // Request type IsXHR bool // True if X-Requested-With: XMLHttpRequest header present ContentType string // Response Content-Type header } // ProxyMode defines the proxy operation mode type ProxyMode string const ( ProxyModeFull ProxyMode = "full" // Traditional HTTP proxy ProxyModeReverse ProxyMode = "reverse" // Reverse proxy for detected URLs ) // URLMapping represents a reverse proxy mapping type URLMapping struct { TargetURL string // e.g., "http://localhost:3000" ProxyPort int // e.g., 8889 ProxyURL string // e.g., "http://localhost:8889" ProcessName string Label string // e.g., "Frontend", "API", extracted from log context CreatedAt time.Time Server *http.Server // The HTTP server for this mapping ReverseProxy *httputil.ReverseProxy // The reverse proxy instance for this mapping } //go:embed monitor.js var monitoringScript string // Server manages the HTTP proxy server type Server struct { port int mode ProxyMode proxy *goproxy.ProxyHttpServer server *http.Server eventBus *events.EventBus // Separate locks for different concerns to prevent deadlocks dataMu sync.RWMutex // Protects data structures (requests, urlMap, etc.) serverMu sync.Mutex // Protects server start/stop operations only requests []Request urlMap map[string]string // Maps URL to process name // Reverse proxy specific fields urlMappings map[string]*URLMapping // Maps target URL to mapping nextPort int // Next available port for reverse proxy basePort int // Base port for reverse proxy mode // Telemetry telemetry *TelemetryStore enableTelemetry bool // WebSocket connections for real-time telemetry wsUpgrader websocket.Upgrader wsClients map[*websocket.Conn]bool // Lock-free atomic state for running status running int64 // 0 = stopped, 1 = running (use atomic operations) } // createSilentLogger creates a logger that discards all output to prevent // HTTP server errors from appearing in stdout/stderr during TUI mode func (s *Server) createSilentLogger() *log.Logger { return log.New(io.Discard, "", 0) } // decodeJWT attempts to decode a JWT token without verification // This is for display purposes only - we're not validating signatures func decodeJWT(tokenString string) (map[string]interface{}, error) { // JWT has 3 parts separated by dots parts := strings.Split(tokenString, ".") if len(parts) != 3 { return nil, fmt.Errorf("invalid JWT format: expected 3 parts, got %d", len(parts)) } // Decode the payload (second part) payload := parts[1] // Add padding if necessary switch len(payload) % 4 { case 2: payload += "==" case 3: payload += "=" } // Decode base64 decoded, err := base64.URLEncoding.DecodeString(payload) if err != nil { return nil, fmt.Errorf("failed to decode JWT payload: %v", err) } // Parse JSON var claims map[string]interface{} if err := json.Unmarshal(decoded, &claims); err != nil { return nil, fmt.Errorf("failed to parse JWT claims: %v", err) } return claims, nil } // extractAuthInfo extracts authentication information from request headers func extractAuthInfo(r *http.Request) (hasAuth bool, authType string, jwtClaims map[string]interface{}, jwtError string) { authHeader := r.Header.Get("Authorization") if authHeader == "" { return false, "", nil, "" } hasAuth = true // Parse auth type and token parts := strings.SplitN(authHeader, " ", 2) if len(parts) < 2 { authType = "Unknown" return } authType = parts[0] token := parts[1] // If it's a Bearer token, try to decode as JWT if strings.EqualFold(authType, "Bearer") { claims, err := decodeJWT(token) if err != nil { jwtError = err.Error() } else { jwtClaims = claims } } return } // NewServer creates a new proxy server func NewServer(port int, eventBus *events.EventBus) *Server { return NewServerWithMode(port, ProxyModeFull, eventBus) } // NewServerWithMode creates a new proxy server with specified mode func NewServerWithMode(port int, mode ProxyMode, eventBus *events.EventBus) *Server { s := &Server{ port: port, mode: mode, eventBus: eventBus, requests: make([]Request, 0, 1000), urlMap: make(map[string]string), urlMappings: make(map[string]*URLMapping), basePort: port, nextPort: port + 1000, // Start allocating from port+1000 for reverse proxy URLs telemetry: NewTelemetryStore(), enableTelemetry: true, wsUpgrader: websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { return true // Allow all origins for development }, }, wsClients: make(map[*websocket.Conn]bool), } if mode == ProxyModeFull { proxy := goproxy.NewProxyHttpServer() proxy.Verbose = false proxy.Logger = s.createSilentLogger() s.proxy = proxy s.setupHandlers() } return s } // setupHandlers configures the proxy request/response handlers func (s *Server) setupHandlers() { // Handle requests s.proxy.OnRequest().DoFunc(func(r *http.Request, ctx *goproxy.ProxyCtx) (*http.Request, *http.Response) { startTime := time.Now() // Generate request ID reqID := fmt.Sprintf("%d", ctx.Session) // Get the process name for this URL processName := s.getProcessForURL(r.URL.String()) // Extract authentication info hasAuth, authType, jwtClaims, jwtError := extractAuthInfo(r) // Store request info in context ctx.UserData = &Request{ ID: reqID, Method: r.Method, URL: r.URL.String(), Host: r.Host, Path: r.URL.Path, StartTime: startTime, ProcessName: processName, HasAuth: hasAuth, AuthType: authType, JWTClaims: jwtClaims, JWTError: jwtError, IsXHR: r.Header.Get("X-Requested-With") == "XMLHttpRequest", } return r, nil }) // Handle responses s.proxy.OnResponse().DoFunc(func(resp *http.Response, ctx *goproxy.ProxyCtx) *http.Response { if ctx.UserData != nil { req := ctx.UserData.(*Request) req.Duration = time.Since(req.StartTime) req.StatusCode = resp.StatusCode req.ContentType = resp.Header.Get("Content-Type") // Check if this is an error response req.IsError = resp.StatusCode >= 400 // Get response size if resp.ContentLength > 0 { req.Size = resp.ContentLength } // Store the request s.addRequest(*req) // Publish event s.eventBus.Publish(events.Event{ Type: events.EventType("proxy.request"), ProcessID: req.ProcessName, Data: map[string]interface{}{ "method": req.Method, "url": req.URL, "status": req.StatusCode, "duration": req.Duration.Milliseconds(), "size": req.Size, "processName": req.ProcessName, }, }) // Inject monitoring script into HTML responses if s.enableTelemetry && resp != nil && resp.StatusCode == 200 { resp = s.injectMonitoringScript(resp, req.ProcessName) } } return resp }) // Handle errors s.proxy.OnResponse(goproxy.StatusCodeIs(0)).DoFunc(func(resp *http.Response, ctx *goproxy.ProxyCtx) *http.Response { if ctx.UserData != nil { req := ctx.UserData.(*Request) req.Duration = time.Since(req.StartTime) req.Error = "Connection failed" // Store the failed request s.addRequest(*req) } return resp }) } // injectMonitoringScript injects the monitoring JavaScript into HTML responses func (s *Server) injectMonitoringScript(resp *http.Response, processName string) *http.Response { // Check if response is HTML contentType := resp.Header.Get("Content-Type") if !strings.Contains(strings.ToLower(contentType), "text/html") { return resp } // Check if this is an AJAX/fetch request - skip injection for those if s.isBackgroundRequest(resp.Request) { return resp } // Read the response body body, err := io.ReadAll(resp.Body) if err != nil { return resp } resp.Body.Close() // Decompress if needed encoding := resp.Header.Get("Content-Encoding") if encoding == "gzip" { reader, err := gzip.NewReader(bytes.NewReader(body)) if err != nil { resp.Body = io.NopCloser(bytes.NewReader(body)) return resp } body, err = io.ReadAll(reader) reader.Close() if err != nil { resp.Body = io.NopCloser(bytes.NewReader(body)) return resp } } // Convert body to string for manipulation bodyStr := string(body) // Check if script is already injected if strings.Contains(bodyStr, "<!-- Brummer Monitoring Script -->") { // Script already injected, return response as-is resp.Body = io.NopCloser(bytes.NewReader(body)) return resp } // Create the injection script with metadata injectionScript := fmt.Sprintf(` <!-- Brummer Monitoring Script --> <script> // Set process name and proxy host for telemetry window.__brummerProcessName = '%s'; window.__brummerProxyHost = 'localhost:%d'; </script> <script> %s </script> <!-- End Brummer Monitoring Script --> `, processName, s.port, monitoringScript) // Try to inject before </body> or </html> injected := false for _, tag := range []string{"</body>", "</html>"} { if idx := strings.LastIndex(strings.ToLower(bodyStr), tag); idx != -1 { bodyStr = bodyStr[:idx] + injectionScript + bodyStr[idx:] injected = true break } } // If no suitable tag found, append to the end if !injected { bodyStr += injectionScript } // Update body newBody := []byte(bodyStr) // Re-compress if needed if encoding == "gzip" { var buf bytes.Buffer gw := gzip.NewWriter(&buf) _, err = gw.Write(newBody) gw.Close() if err == nil { newBody = buf.Bytes() } } // Update response resp.Body = io.NopCloser(bytes.NewReader(newBody)) resp.ContentLength = int64(len(newBody)) resp.Header.Set("Content-Length", fmt.Sprintf("%d", len(newBody))) // Remove content security policy that might block our script resp.Header.Del("Content-Security-Policy") resp.Header.Del("X-Content-Security-Policy") return resp } // injectMonitoringScriptForMapping injects telemetry script using mapping-specific port func (s *Server) injectMonitoringScriptForMapping(resp *http.Response, mapping *URLMapping, req *http.Request) *http.Response { // Check content type contentType := resp.Header.Get("Content-Type") if !strings.Contains(strings.ToLower(contentType), "text/html") { return resp } // Skip injection for XHR/AJAX requests if req != nil { // Check for common AJAX headers if req.Header.Get("X-Requested-With") == "XMLHttpRequest" { return resp } // Check for Fetch API requests if req.Header.Get("Sec-Fetch-Mode") == "cors" || req.Header.Get("Sec-Fetch-Dest") == "empty" { return resp } // Check Accept header for non-HTML requests accept := req.Header.Get("Accept") if accept != "" && !strings.Contains(accept, "text/html") && !strings.Contains(accept, "*/*") { return resp } } // Read the response body body, err := io.ReadAll(resp.Body) if err != nil { return resp } resp.Body.Close() // Decompress if needed encoding := resp.Header.Get("Content-Encoding") if encoding == "gzip" { reader, err := gzip.NewReader(bytes.NewReader(body)) if err != nil { resp.Body = io.NopCloser(bytes.NewReader(body)) return resp } body, err = io.ReadAll(reader) reader.Close() if err != nil { resp.Body = io.NopCloser(bytes.NewReader(body)) return resp } } // Convert body to string for manipulation bodyStr := string(body) // Check if script is already injected if strings.Contains(bodyStr, "<!-- Brummer Monitoring Script -->") { // Script already injected, return response as-is resp.Body = io.NopCloser(bytes.NewReader(body)) return resp } // Create the injection script with metadata - use mapping's proxy port, not control port injectionScript := fmt.Sprintf(` <!-- Brummer Monitoring Script --> <script> // Set process name and proxy host for telemetry window.__brummerProcessName = '%s'; window.__brummerProxyHost = 'localhost:%d'; </script> <script> %s </script> <!-- End Brummer Monitoring Script --> `, mapping.ProcessName, mapping.ProxyPort, monitoringScript) // Try to inject before </body> or </html> injected := false for _, tag := range []string{"</body>", "</html>"} { if idx := strings.LastIndex(strings.ToLower(bodyStr), tag); idx != -1 { bodyStr = bodyStr[:idx] + injectionScript + bodyStr[idx:] injected = true break } } // If no suitable tag found, append to the end if !injected { bodyStr += injectionScript } // Update body newBody := []byte(bodyStr) // Re-compress if needed if encoding == "gzip" { var buf bytes.Buffer gw := gzip.NewWriter(&buf) _, err = gw.Write(newBody) gw.Close() if err == nil { newBody = buf.Bytes() } } // Update response resp.Body = io.NopCloser(bytes.NewReader(newBody)) resp.ContentLength = int64(len(newBody)) resp.Header.Set("Content-Length", fmt.Sprintf("%d", len(newBody))) // Remove content security policy that might block our script resp.Header.Del("Content-Security-Policy") resp.Header.Del("X-Content-Security-Policy") return resp } // rewriteURLsInResponse rewrites URLs in HTML responses to use the proxy func (s *Server) rewriteURLsInResponse(resp *http.Response, mapping *URLMapping) *http.Response { // Check if response is HTML - be more flexible with content type detection contentType := resp.Header.Get("Content-Type") contentTypeLower := strings.ToLower(contentType) isHTML := strings.Contains(contentTypeLower, "text/html") || strings.Contains(contentTypeLower, "text/plain") || // Some servers serve HTML as text/plain contentType == "" // If no content type, assume it might be HTML if !isHTML { // Log for debugging - this might be why rewriting isn't happening // URL rewriting skipped for non-HTML content return resp } // URL rewriting in progress // Read the response body body, err := io.ReadAll(resp.Body) if err != nil { return resp } resp.Body.Close() // Handle gzip encoding encoding := resp.Header.Get("Content-Encoding") if encoding == "gzip" { reader, err := gzip.NewReader(bytes.NewReader(body)) if err != nil { // Not gzip encoded despite header, use as-is } else { body, err = io.ReadAll(reader) reader.Close() if err != nil { return resp } } } // Convert body to string for manipulation bodyStr := string(body) // Parse target URL to extract domain and port targetURL, err := url.Parse(mapping.TargetURL) if err != nil { resp.Body = io.NopCloser(bytes.NewReader(body)) return resp } // Extract host (domain:port) from target URL - handles all URL forms including user:pass@domain:port targetHost := targetURL.Host proxyHost := fmt.Sprintf("localhost:%d", mapping.ProxyPort) // Replacing host in HTML content // Simple host replacement - works for all URL variations if targetHost != "" { bodyStr = strings.ReplaceAll(bodyStr, targetHost, proxyHost) } // Update body newBody := []byte(bodyStr) // Re-compress if needed if encoding == "gzip" { var buf bytes.Buffer gw := gzip.NewWriter(&buf) _, err = gw.Write(newBody) gw.Close() if err == nil { newBody = buf.Bytes() } } // Update response resp.Body = io.NopCloser(bytes.NewReader(newBody)) resp.ContentLength = int64(len(newBody)) resp.Header.Set("Content-Length", fmt.Sprintf("%d", len(newBody))) return resp } // getProcessForURL returns the process name associated with a URL func (s *Server) getProcessForURL(urlStr string) string { s.dataMu.RLock() defer s.dataMu.RUnlock() normalized := normalizeURL(urlStr) // Try exact match first if process, ok := s.urlMap[normalized]; ok { return process } // Try to match by host if u, err := url.Parse(urlStr); err == nil { for mappedURL, process := range s.urlMap { if mu, err := url.Parse(mappedURL); err == nil { if u.Host == mu.Host { return process } } } } return "unknown" } // normalizeURL normalizes a URL for consistent mapping func normalizeURL(urlStr string) string { u, err := url.Parse(urlStr) if err != nil { return urlStr } // Remove trailing slashes from path path := strings.TrimSuffix(u.Path, "/") if path == "" { path = "/" } // Rebuild URL without query params for mapping normalized := fmt.Sprintf("%s://%s%s", u.Scheme, u.Host, path) return normalized } // addRequest stores a request in the history func (s *Server) addRequest(req Request) { s.dataMu.Lock() defer s.dataMu.Unlock() // Try to find telemetry session for this URL if s.telemetry != nil { sessions := s.telemetry.GetSessionsForURL(req.URL) if len(sessions) > 0 { // Link to the most recent session for this URL req.SessionID = sessions[0].SessionID req.HasTelemetry = true req.Telemetry = sessions[0] } } s.requests = append(s.requests, req) // Keep only last 1000 requests if len(s.requests) > 1000 { s.requests = s.requests[len(s.requests)-1000:] } } // linkTelemetryToRequests links telemetry data to existing requests func (s *Server) linkTelemetryToRequests(sessionID string) { s.dataMu.Lock() defer s.dataMu.Unlock() session, exists := s.telemetry.GetSession(sessionID) if !exists || session == nil { return } // Find requests that match this session's URL and link them sessionURL := session.URL if sessionURL == "" { return } // Normalize the session URL for comparison normalizedSessionURL := normalizeURL(sessionURL) for i := range s.requests { req := &s.requests[i] // Skip if already has telemetry if req.HasTelemetry { continue } // Check if URLs match (normalized comparison) normalizedReqURL := normalizeURL(req.URL) if normalizedReqURL == normalizedSessionURL || req.URL == sessionURL { req.SessionID = sessionID req.HasTelemetry = true req.Telemetry = session // Telemetry session linked to request } } } // GetRequests returns all stored requests func (s *Server) GetRequests() []Request { s.dataMu.RLock() defer s.dataMu.RUnlock() // Return a copy to avoid race conditions requests := make([]Request, len(s.requests)) copy(requests, s.requests) return requests } // GetRequestsForProcess returns requests for a specific process func (s *Server) GetRequestsForProcess(processName string) []Request { s.dataMu.RLock() defer s.dataMu.RUnlock() var filtered []Request for _, req := range s.requests { if req.ProcessName == processName { filtered = append(filtered, req) } } return filtered } // Start starts the proxy server func (s *Server) Start() error { // Use separate mutex for server operations to prevent deadlock s.serverMu.Lock() defer s.serverMu.Unlock() // Check if already running using atomic operation (lock-free) if atomic.LoadInt64(&s.running) == 1 { return fmt.Errorf("proxy server already running") } // Try to find an available port, starting from the requested port availablePort, err := ports.FindAvailablePort(s.port) if err != nil { return fmt.Errorf("failed to find available port: %w", err) } // Update the port if it changed if availablePort != s.port { s.port = availablePort } // In full proxy mode, ensure goproxy is initialized if s.mode == ProxyModeFull && s.proxy == nil { proxy := goproxy.NewProxyHttpServer() proxy.Verbose = false proxy.Logger = s.createSilentLogger() s.proxy = proxy s.setupHandlers() } // Create a custom handler that serves PAC file and proxies everything else addr := fmt.Sprintf(":%d", s.port) server := &http.Server{ Addr: addr, ErrorLog: s.createSilentLogger(), Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Handle PAC file requests if r.URL.Path == "/proxy.pac" || r.URL.Path == "/pac" { s.servePACFile(w, r) return } // Handle telemetry endpoint if r.URL.Path == "/__brummer_telemetry__" && r.Method == "POST" { s.handleTelemetry(w, r) return } // Handle WebSocket telemetry endpoint if r.URL.Path == "/__brummer_ws__" { s.handleWebSocketTelemetry(w, r) return } // Handle direct browsing to proxy server (not proxy requests) if r.Header.Get("Host") == r.Host && (r.URL.Path == "/" || r.URL.Path == "") { fmt.Fprintf(w, "Brummer Proxy Server\n\n") fmt.Fprintf(w, "Mode: %s\n", s.mode) fmt.Fprintf(w, "PAC File: http://localhost:%d/proxy.pac\n\n", s.port) fmt.Fprintf(w, "Configure your browser's automatic proxy configuration URL to:\n") fmt.Fprintf(w, "http://localhost:%d/proxy.pac\n", s.port) return } // In reverse proxy mode, we don't handle proxy requests on the main port if s.mode == ProxyModeReverse { http.Error(w, "This is the control port. Use the dedicated proxy ports for each URL.", http.StatusBadRequest) return } // All proxy requests go through goproxy (full proxy mode only) if s.proxy != nil { s.proxy.ServeHTTP(w, r) } else { http.Error(w, "Proxy not initialized", http.StatusInternalServerError) } }), } // Assign server safely under lock s.dataMu.Lock() s.server = server s.dataMu.Unlock() // Set running state atomically (lock-free) atomic.StoreInt64(&s.running, 1) // Start server in goroutine without holding any locks go func() { // Proxy server starting (logs disabled for TUI compatibility) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { // Proxy server error - set state atomically (logged internally) atomic.StoreInt64(&s.running, 0) } }() return nil } // Stop stops the proxy server func (s *Server) Stop() error { // Use separate mutex for server operations to prevent deadlock s.serverMu.Lock() defer s.serverMu.Unlock() // Check if already stopped using atomic operation (lock-free) if atomic.LoadInt64(&s.running) == 0 { return nil } // Set stopped state atomically (lock-free) atomic.StoreInt64(&s.running, 0) // In reverse proxy mode, stop all individual servers (need data lock for urlMappings) if s.mode == ProxyModeReverse { s.dataMu.RLock() mappings := make([]*URLMapping, 0, len(s.urlMappings)) for _, mapping := range s.urlMappings { mappings = append(mappings, mapping) } s.dataMu.RUnlock() // Close servers outside the data lock to prevent blocking for _, mapping := range mappings { if mapping.Server != nil { // Stopping reverse proxy (logged internally) mapping.Server.Close() } } } // Get server safely under lock s.dataMu.RLock() server := s.server s.dataMu.RUnlock() if server != nil { return server.Close() } return nil } // IsRunning returns whether the proxy server is running func (s *Server) IsRunning() bool { return atomic.LoadInt64(&s.running) == 1 } // ClearRequests clears all stored requests func (s *Server) ClearRequests() { s.dataMu.Lock() defer s.dataMu.Unlock() s.requests = make([]Request, 0, 1000) // Note: We don't stop the proxy servers, just clear the request history } // ClearRequestsForProcess clears requests for a specific process func (s *Server) ClearRequestsForProcess(processName string) { s.dataMu.Lock() defer s.dataMu.Unlock() var filtered []Request for _, req := range s.requests { if req.ProcessName != processName { filtered = append(filtered, req) } } s.requests = filtered } // GetPort returns the proxy server port func (s *Server) GetPort() int { return s.port } // GetMode returns the proxy server mode func (s *Server) GetMode() ProxyMode { return s.mode } // SwitchMode switches the proxy server between full and reverse mode func (s *Server) SwitchMode(newMode ProxyMode) error { s.dataMu.Lock() defer s.dataMu.Unlock() if s.mode == newMode { return nil // Already in the requested mode } oldMode := s.mode wasRunning := atomic.LoadInt64(&s.running) == 1 // Stop the server first if wasRunning { atomic.StoreInt64(&s.running, 0) // Get server safely under lock s.dataMu.RLock() server := s.server s.dataMu.RUnlock() if server != nil { server.Close() } // In reverse proxy mode, stop all individual servers if oldMode == ProxyModeReverse { for _, mapping := range s.urlMappings { if mapping.Server != nil { // Stopping reverse proxy for mode switch (logged internally) mapping.Server.Close() } } } } // Clear existing proxy setup s.proxy = nil // Switch mode s.mode = newMode // Clear URL mappings when switching away from reverse mode if oldMode == ProxyModeReverse && newMode == ProxyModeFull { // Clear the individual reverse proxy servers s.urlMappings = make(map[string]*URLMapping) // Reset next port s.nextPort = s.basePort + 1000 } // Set up new mode if newMode == ProxyModeFull { proxy := goproxy.NewProxyHttpServer() proxy.Verbose = false proxy.Logger = s.createSilentLogger() s.proxy = proxy s.setupHandlers() } // Restart if it was running if wasRunning { // Create new server with updated handler newServer := &http.Server{ Addr: fmt.Sprintf(":%d", s.port), ErrorLog: s.createSilentLogger(), Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Handle PAC file requests if r.URL.Path == "/proxy.pac" || r.URL.Path == "/pac" { s.servePACFile(w, r) return } // Handle telemetry endpoint if r.URL.Path == "/__brummer_telemetry__" && r.Method == "POST" { s.handleTelemetry(w, r) return } // Handle WebSocket telemetry endpoint if r.URL.Path == "/__brummer_ws__" { s.handleWebSocketTelemetry(w, r) return } // Handle direct browsing to proxy server (not proxy requests) if r.Header.Get("Host") == r.Host && (r.URL.Path == "/" || r.URL.Path == "") { fmt.Fprintf(w, "Brummer Proxy Server\n\n") fmt.Fprintf(w, "Mode: %s\n", s.mode) fmt.Fprintf(w, "PAC File: http://localhost:%d/proxy.pac\n\n", s.port) fmt.Fprintf(w, "Configure your browser's automatic proxy configuration URL to:\n") fmt.Fprintf(w, "http://localhost:%d/proxy.pac\n", s.port) return } // In reverse proxy mode, we don't handle proxy requests on the main port if s.mode == ProxyModeReverse { http.Error(w, "This is the control port. Use the dedicated proxy ports for each URL.", http.StatusBadRequest) return } // All proxy requests go through goproxy (full proxy mode only) if s.proxy != nil { s.proxy.ServeHTTP(w, r) } else { http.Error(w, "Proxy not initialized", http.StatusInternalServerError) } }), } // Assign server safely under lock s.dataMu.Lock() s.server = newServer s.dataMu.Unlock() atomic.StoreInt64(&s.running, 1) go func() { // Restarting proxy server (logs disabled for TUI compatibility) if err := newServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { // Proxy server error - set state atomically (logged internally) atomic.StoreInt64(&s.running, 0) } }() // Publish mode switch event s.eventBus.Publish(events.Event{ Type: events.EventType("proxy.mode_switched"), Data: map[string]interface{}{ "old_mode": string(oldMode), "new_mode": string(newMode), }, }) } return nil } // createReverseProxyHandler creates an HTTP handler for reverse proxy mode func (s *Server) createReverseProxyHandler() http.Handler { // In reverse proxy mode, we don't use the main port // Instead, each URL gets its own port mux := http.NewServeMux() mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "Reverse proxy mode active. Each URL gets its own port.\n") }) return mux } // createURLProxyHandler creates an httputil.ReverseProxy-based handler for a specific URL mapping func (s *Server) createURLProxyHandler(mapping *URLMapping) http.Handler { // Parse target URL targetURL, err := url.Parse(mapping.TargetURL) if err != nil { // Invalid target URL for mapping (logged internally) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { http.Error(w, "Invalid target URL configuration", http.StatusInternalServerError) }) } // Create reverse proxy with proper URL rewriting rp := &httputil.ReverseProxy{ ErrorLog: s.createSilentLogger(), Director: func(req *http.Request) { // Store original URL for logging originalURL := fmt.Sprintf("http://%s%s", req.Host, req.URL.RequestURI()) req.Header.Set("X-Original-URL", originalURL) // Rewrite the request to target the backend req.URL.Scheme = targetURL.Scheme req.URL.Host = targetURL.Host req.Host = targetURL.Host req.Header.Set("X-Forwarded-Host", req.Header.Get("Host")) req.Header.Set("X-Forwarded-Proto", "http") }, ModifyResponse: func(resp *http.Response) error { // Track the request startTime := time.Now() reqID := fmt.Sprintf("%d", time.Now().UnixNano()) originalURL := resp.Request.Header.Get("X-Original-URL") if originalURL == "" { originalURL = resp.Request.URL.String() } // Extract authentication info hasAuth, authType, jwtClaims, jwtError := extractAuthInfo(resp.Request) // Create request record reqRecord := Request{ ID: reqID, Method: resp.Request.Method, URL: originalURL, Host: resp.Request.Host, Path: resp.Request.URL.Path, StartTime: startTime, ProcessName: mapping.ProcessName, StatusCode: resp.StatusCode, IsError: resp.StatusCode >= 400, HasAuth: hasAuth, AuthType: authType, JWTClaims: jwtClaims, JWTError: jwtError, IsXHR: resp.Request.Header.Get("X-Requested-With") == "XMLHttpRequest", ContentType: resp.Header.Get("Content-Type"), } if resp.ContentLength > 0 { reqRecord.Size = resp.ContentLength } // Store the request s.addRequest(reqRecord) // Publish event s.eventBus.Publish(events.Event{ Type: events.EventType("proxy.request"), ProcessID: reqRecord.ProcessName, Data: map[string]interface{}{ "method": reqRecord.Method, "url": reqRecord.URL, "status": reqRecord.StatusCode, "duration": reqRecord.Duration.Milliseconds(), "size": reqRecord.Size, "processName": reqRecord.ProcessName, }, }) // Inject monitoring script and rewrite URLs for HTML responses if resp.StatusCode == 200 { if s.enableTelemetry { s.injectMonitoringScriptForMapping(resp, mapping, resp.Request) } s.rewriteURLsInResponse(resp, mapping) } return nil }, } mapping.ReverseProxy = rp // Handle telemetry endpoint directly mux := http.NewServeMux() mux.HandleFunc("/__brummer_telemetry__", func(w http.ResponseWriter, r *http.Request) { if r.Method == "POST" || r.Method == "OPTIONS" { s.handleTelemetry(w, r) } else { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) } }) // Handle WebSocket telemetry endpoint directly mux.HandleFunc("/__brummer_ws__", func(w http.ResponseWriter, r *http.Request) { s.handleWebSocketTelemetry(w, r) }) // For all other requests, use the reverse proxy mux.HandleFunc("/", rp.ServeHTTP) return mux } // RegisterURL associates a URL with a process name and returns the proxy URL func (s *Server) RegisterURL(urlStr, processName string) string { return s.RegisterURLWithLabel(urlStr, processName, processName) } // RegisterURLWithLabel associates a URL with a process name and label, and returns the proxy URL func (s *Server) RegisterURLWithLabel(urlStr, processName, label string) string { s.dataMu.Lock() defer s.dataMu.Unlock() // Normalize URL normalized := normalizeURL(urlStr) s.urlMap[normalized] = processName // In reverse proxy mode, create a separate server for each URL if s.mode == ProxyModeReverse { // Check if we already have a mapping for this URL if existing, exists := s.urlMappings[normalized]; exists { // Update label if it's different and more descriptive than current if label != processName && (existing.Label == existing.ProcessName || existing.Label == "") { existing.Label = label } return existing.ProxyURL } // Allocate a new port starting from nextPort port, err := ports.FindAvailablePort(s.nextPort) if err != nil { return normalized // Return original URL if we can't allocate a port } s.nextPort = port + 1 // Update nextPort for next allocation // Create mapping mapping := &URLMapping{ TargetURL: normalized, ProxyPort: port, ProxyURL: fmt.Sprintf("http://localhost:%d", port), ProcessName: processName, Label: label, CreatedAt: time.Now(), } // Create and start a new server for this URL handler := s.createURLProxyHandler(mapping) server := &http.Server{ Addr: fmt.Sprintf(":%d", port), ErrorLog: s.createSilentLogger(), Handler: handler, } mapping.Server = server // Start the server go func() { var msg string if mapping.Label != "" && mapping.Label != mapping.ProcessName { msg = fmt.Sprintf("%s started for %s (%s)", mapping.ProxyURL, normalized, mapping.Label) } else { msg = fmt.Sprintf("%s started for %s", mapping.ProxyURL, normalized) } // Publish event for TUI to show in system messages instead of stdout s.eventBus.Publish(events.Event{ Type: events.EventType("system.message"), Data: map[string]interface{}{ "level": "info", "context": "Proxy", "message": msg, }, }) if err := server.ListenAndServe(); err != nil && err != http.ErrServerClosed { errorMsg := fmt.Sprintf("Reverse proxy error for %s: %v", normalized, err) s.eventBus.Publish(events.Event{ Type: events.EventType("system.message"), Data: map[string]interface{}{ "level": "error", "context": "Proxy", "message": errorMsg, }, }) } }() s.urlMappings[normalized] = mapping // Also register the proxy URL so telemetry can map it back to the process // This handles cases where telemetry comes from the proxied URL (localhost:20888) proxyURLNormalized := normalizeURL(mapping.ProxyURL) s.urlMap[proxyURLNormalized] = processName return mapping.ProxyURL } return urlStr } // GetProxyURL returns the proxy URL for a given target URL (reverse proxy mode only) func (s *Server) GetProxyURL(targetURL string) string { if s.mode != ProxyModeReverse { return targetURL } s.dataMu.RLock() defer s.dataMu.RUnlock() normalized := normalizeURL(targetURL) if mapping, exists := s.urlMappings[normalized]; exists { return mapping.ProxyURL } return targetURL } // GetURLMappings returns all URL mappings (reverse proxy mode only) func (s *Server) GetURLMappings() []URLMapping { s.dataMu.RLock() defer s.dataMu.RUnlock() mappings := make([]URLMapping, 0, len(s.urlMappings)) for _, mapping := range s.urlMappings { mappings = append(mappings, *mapping) } // Sort by creation time (newest first) for i := 0; i < len(mappings)-1; i++ { for j := i + 1; j < len(mappings); j++ { if mappings[j].CreatedAt.After(mappings[i].CreatedAt) { mappings[i], mappings[j] = mappings[j], mappings[i] } } } return mappings } // servePACFile serves the Proxy Auto-Configuration file func (s *Server) servePACFile(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/x-ns-proxy-autoconfig") // Generate PAC file that uses the proxy with fallback to direct pacContent := fmt.Sprintf(`// Brummer Proxy Auto-Configuration function FindProxyForURL(url, host) { // Use proxy for HTTP requests, with fallback to direct if (url.substring(0, 5) == "http:") { return "PROXY localhost:%d; DIRECT"; } // Use proxy for HTTPS requests, with fallback to direct if (url.substring(0, 6) == "https:") { return "PROXY localhost:%d; DIRECT"; } // Everything else goes direct return "DIRECT"; }`, s.port, s.port) fmt.Fprint(w, pacContent) } // GetPACURL returns the URL for the PAC file func (s *Server) GetPACURL() string { return fmt.Sprintf("http://localhost:%d/proxy.pac", s.port) } // handleTelemetry handles incoming telemetry data from the monitoring script func (s *Server) handleTelemetry(w http.ResponseWriter, r *http.Request) { // Enable CORS for telemetry endpoint w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS") w.Header().Set("Access-Control-Allow-Headers", "Content-Type") // Handle preflight if r.Method == "OPTIONS" { w.WriteHeader(http.StatusOK) return } // Read body body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, "Failed to read request body", http.StatusBadRequest) return } defer r.Body.Close() // Parse telemetry batch var batch TelemetryBatch if err := json.Unmarshal(body, &batch); err != nil { http.Error(w, "Invalid telemetry data", http.StatusBadRequest) return } // Get process name from referer or use default processName := "unknown" referer := r.Header.Get("Referer") if referer != "" { processName = s.getProcessForURL(referer) } // Store telemetry data s.telemetry.AddBatch(batch, processName) // Retroactively link telemetry to existing requests s.linkTelemetryToRequests(batch.SessionID) // Broadcast to WebSocket clients s.SendTelemetryToWebSockets(batch, processName) // Publish telemetry event s.eventBus.Publish(events.Event{ Type: events.EventType("telemetry.received"), ProcessID: processName, Data: map[string]interface{}{ "sessionId": batch.SessionID, "eventCount": len(batch.Events), }, }) // Return success w.WriteHeader(http.StatusOK) fmt.Fprint(w, `{"status":"ok"}`) } // GetTelemetryStore returns the telemetry store func (s *Server) GetTelemetryStore() *TelemetryStore { return s.telemetry } // EnableTelemetry enables or disables telemetry collection func (s *Server) EnableTelemetry(enable bool) { // Just set it directly - bool writes are atomic s.enableTelemetry = enable } // IsTelemetryEnabled returns whether telemetry is enabled func (s *Server) IsTelemetryEnabled() bool { // Just read it directly - bool reads are atomic return s.enableTelemetry } // GetTelemetrySession returns telemetry data for a specific session func (s *Server) GetTelemetrySession(sessionID string) (*PageSession, bool) { return s.telemetry.GetSession(sessionID) } // GetTelemetryForProcess returns all telemetry sessions for a process func (s *Server) GetTelemetryForProcess(processName string) []*PageSession { return s.telemetry.GetSessionsForProcess(processName) } // ClearTelemetryForProcess clears telemetry data for a specific process func (s *Server) ClearTelemetryForProcess(processName string) { s.telemetry.ClearSessionsForProcess(processName) } // isBackgroundRequest checks if this is an AJAX/fetch request based on headers func (s *Server) isBackgroundRequest(req *http.Request) bool { if req == nil { return false } // Check for XMLHttpRequest header (used by jQuery and older AJAX libraries) if req.Header.Get("X-Requested-With") == "XMLHttpRequest" { return true } // Check Fetch metadata headers (modern browsers) fetchMode := req.Header.Get("Sec-Fetch-Mode") fetchDest := req.Header.Get("Sec-Fetch-Dest") // Skip injection for cors, no-cors, and same-origin modes (typically AJAX) // Only inject for navigate mode (actual page navigation) if fetchMode != "" && fetchMode != "navigate" { return true } // Skip injection for non-document destinations if fetchDest != "" && fetchDest != "document" { return true } // Check Accept header - if it's specifically asking for JSON or XML, skip accept := req.Header.Get("Accept") if strings.Contains(accept, "application/json") || strings.Contains(accept, "application/xml") || strings.Contains(accept, "text/xml") { return true } return false } // WebSocket message types type WSMessage struct { Type string `json:"type"` Data interface{} `json:"data"` Timestamp int64 `json:"timestamp"` } // handleWebSocketTelemetry handles WebSocket connections for real-time telemetry func (s *Server) handleWebSocketTelemetry(w http.ResponseWriter, r *http.Request) { // Upgrade HTTP connection to WebSocket conn, err := s.wsUpgrader.Upgrade(w, r, nil) if err != nil { // WebSocket upgrade failed (logged internally) return } defer conn.Close() // Register client s.dataMu.Lock() s.wsClients[conn] = true clientCount := len(s.wsClients) s.dataMu.Unlock() // WebSocket connection events are only for internal tracking, no need to log to stdout // Send welcome message welcomeMsg := WSMessage{ Type: "connected", Data: map[string]interface{}{ "message": "Connected to Brummer telemetry", "serverTime": time.Now().UnixMilli(), "clientCount": clientCount, }, Timestamp: time.Now().UnixMilli(), } conn.WriteJSON(welcomeMsg) // Handle incoming messages (for REPL commands) for { var msg WSMessage err := conn.ReadJSON(&msg) if err != nil { // WebSocket read error (logged internally) break } // Handle REPL commands s.handleWSCommand(conn, msg) } // Unregister client s.dataMu.Lock() delete(s.wsClients, conn) clientCount = len(s.wsClients) s.dataMu.Unlock() // WebSocket disconnection events are only for internal tracking, no need to log to stdout } // handleWSCommand processes incoming WebSocket commands for REPL functionality func (s *Server) handleWSCommand(conn *websocket.Conn, msg WSMessage) { response := WSMessage{ Type: "command_response", Timestamp: time.Now().UnixMilli(), } switch msg.Type { case "ping": response.Data = map[string]interface{}{ "pong": true, "serverTime": time.Now().UnixMilli(), } case "status": response.Data = map[string]interface{}{ "requests": len(s.requests), "telemetryEnabled": s.enableTelemetry, "mode": s.mode, "port": s.port, "sessions": s.telemetry.GetAllSessions(), } case "clear_buffer": response.Data = map[string]interface{}{ "message": "Buffer cleared", "cleared": true, } case "get_requests": limit := 100 // Default limit if data, ok := msg.Data.(map[string]interface{}); ok { if l, ok := data["limit"].(float64); ok { limit = int(l) } } requests := s.GetRequests() if len(requests) > limit { requests = requests[len(requests)-limit:] } response.Data = map[string]interface{}{ "requests": requests, "total": len(s.requests), "returned": len(requests), } case "get_telemetry": sessions := s.telemetry.GetAllSessions() response.Data = map[string]interface{}{ "sessions": sessions, "count": len(sessions), } case "queue_message": // Handle incoming message queue messages via WebSocket if data, ok := msg.Data.(map[string]interface{}); ok { // Forward to MCP server via event bus s.eventBus.Publish(events.Event{ Type: events.EventType("queue.message"), Data: data, }) response.Data = map[string]interface{}{ "status": "ok", "type": "queue_message_received", } } else { response.Data = map[string]interface{}{ "error": "Invalid queue message format", } } case "telemetry": // Handle incoming telemetry data via WebSocket if data, ok := msg.Data.(map[string]interface{}); ok { // Convert the data back to TelemetryBatch format batch := TelemetryBatch{} // Extract sessionId if sessionId, ok := data["sessionId"].(string); ok { batch.SessionID = sessionId } // Extract events if events, ok := data["events"].([]interface{}); ok { for _, event := range events { if eventMap, ok := event.(map[string]interface{}); ok { telemetryEvent := TelemetryEvent{} if eventType, ok := eventMap["type"].(string); ok { telemetryEvent.Type = TelemetryEventType(eventType) } if timestamp, ok := eventMap["timestamp"].(float64); ok { telemetryEvent.Timestamp = int64(timestamp) } if sessionId, ok := eventMap["sessionId"].(string); ok { telemetryEvent.SessionID = sessionId } if url, ok := eventMap["url"].(string); ok { telemetryEvent.URL = url } if data, ok := eventMap["data"].(map[string]interface{}); ok { telemetryEvent.Data = data } batch.Events = append(batch.Events, telemetryEvent) } } } // Determine process name from metadata or use default processName := "unknown" if metadata, ok := data["metadata"].(map[string]interface{}); ok { if urlStr, ok := metadata["url"].(string); ok { processName = s.getProcessForURL(urlStr) // If still unknown, try to extract from the URL host if processName == "unknown" { if u, err := url.Parse(urlStr); err == nil { // Check if this is one of our proxy URLs for proxyURL, mappedProcess := range s.urlMap { if pu, err := url.Parse(proxyURL); err == nil { if u.Host == pu.Host { processName = mappedProcess break } } } } } } } // Store telemetry data (same as HTTP handler) s.telemetry.AddBatch(batch, processName) // Retroactively link telemetry to existing requests s.linkTelemetryToRequests(batch.SessionID) // Broadcast to other WebSocket clients s.SendTelemetryToWebSockets(batch, processName) // Publish telemetry event s.eventBus.Publish(events.Event{ Type: events.EventType("telemetry.received"), ProcessID: processName, Data: map[string]interface{}{ "sessionId": batch.SessionID, "eventCount": len(batch.Events), "source": "websocket", }, }) response.Data = map[string]interface{}{ "status": "ok", "received": len(batch.Events), "sessionId": batch.SessionID, } } else { response.Data = map[string]interface{}{ "error": "Invalid telemetry data format", } } // Send response back to client conn.WriteJSON(response) return // Don't send another response below case "repl_response": // Handle REPL response from browser if data, ok := msg.Data.(map[string]interface{}); ok { responseID, hasID := data["responseId"].(string) if !hasID { response.Data = map[string]interface{}{ "error": "Missing responseId in REPL response", } } else { // Forward the response to MCP server via event bus s.eventBus.Publish(events.Event{ Type: events.EventType("repl.response"), Data: map[string]interface{}{ "responseId": responseID, "result": data["result"], "error": data["error"], }, }) response.Data = map[string]interface{}{ "status": "forwarded", "responseId": responseID, } } } else { response.Data = map[string]interface{}{ "error": "Invalid REPL response format", } } default: response.Data = map[string]interface{}{ "error": "Unknown command type: " + msg.Type, } } conn.WriteJSON(response) } // BroadcastToWebSockets sends a message to all connected WebSocket clients func (s *Server) BroadcastToWebSockets(msgType string, data interface{}) { s.dataMu.RLock() defer s.dataMu.RUnlock() if len(s.wsClients) == 0 { return } msg := WSMessage{ Type: msgType, Data: data, Timestamp: time.Now().UnixMilli(), } // Send to all clients for conn := range s.wsClients { err := conn.WriteJSON(msg) if err != nil { // WebSocket broadcast error (logged internally) // Note: Client cleanup now safe with unified mutex hierarchy } } } // SendTelemetryToWebSockets broadcasts telemetry data to WebSocket clients func (s *Server) SendTelemetryToWebSockets(batch TelemetryBatch, processName string) { s.BroadcastToWebSockets("telemetry", map[string]interface{}{ "batch": batch, "processName": processName, }) }

Latest Blog Posts

MCP directory API

We provide all the information about MCP servers via our MCP API.

curl -X GET 'https://glama.ai/api/mcp/v1/servers/standardbeagle/brummer'

If you have feedback or need assistance with the MCP directory API, please join our Discord server