package daemon
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"net"
"net/http"
"os/exec"
"path/filepath"
"regexp"
"runtime"
"strconv"
"strings"
"time"
"github.com/standardbeagle/agnt/internal/automation"
"github.com/standardbeagle/agnt/internal/debug"
"github.com/standardbeagle/agnt/internal/project"
"github.com/standardbeagle/agnt/internal/proxy"
"github.com/standardbeagle/agnt/internal/tunnel"
hubpkg "github.com/standardbeagle/go-cli-server/hub"
goprocess "github.com/standardbeagle/go-cli-server/process"
hubproto "github.com/standardbeagle/go-cli-server/protocol"
// Alias for direct process access
process "github.com/standardbeagle/go-cli-server/process"
)
// writeErr writes an error response and logs it for debugging.
func writeErr(conn *hubpkg.Connection, code hubproto.ErrorCode, component, format string, args ...interface{}) error {
msg := fmt.Sprintf(format, args...)
debug.Log(component, "error: %s (code=%v)", msg, code)
return conn.WriteErr(code, msg)
}
// writeStructuredErr writes a structured error response and logs it for debugging.
func writeStructuredErr(conn *hubpkg.Connection, component string, err *hubproto.StructuredError) error {
debug.Log(component, "error: %s - %s (code=%v)", err.Command, err.Message, err.Code)
return conn.WriteStructuredErr(err)
}
// normalizePath normalizes a path for consistent comparison.
func normalizePath(path string) string {
if path == "" || path == "." {
return "."
}
abs, err := filepath.Abs(path)
if err != nil {
abs = filepath.Clean(path)
}
// On Windows, normalize to lowercase for case-insensitive comparison.
if runtime.GOOS == "windows" {
abs = strings.ToLower(abs)
}
return abs
}
// getSessionScopedProxy retrieves a proxy with session-scoped fuzzy matching.
// If the connection has an associated session, only proxies in that session's
// project path are considered for fuzzy lookup. Exact ID matches always work.
func (d *Daemon) getSessionScopedProxy(conn *hubpkg.Connection, proxyID string) (*proxy.ProxyServer, error) {
// Get path filter from connection's session
pathFilter := ""
if sessionCode := conn.SessionCode(); sessionCode != "" {
if session, ok := d.sessionRegistry.Get(sessionCode); ok {
pathFilter = session.ProjectPath
}
}
return d.proxym.GetWithPathFilter(proxyID, pathFilter)
}
// getSessionScopedTunnel retrieves a tunnel with session-scoped fuzzy matching.
// If the connection has an associated session, only tunnels in that session's
// project path are considered for fuzzy lookup. Exact ID matches always work.
func (d *Daemon) getSessionScopedTunnel(conn *hubpkg.Connection, tunnelID string) (*tunnel.Tunnel, error) {
// Get path filter from connection's session
pathFilter := ""
if sessionCode := conn.SessionCode(); sessionCode != "" {
if session, ok := d.sessionRegistry.Get(sessionCode); ok {
pathFilter = session.ProjectPath
}
}
return d.tunnelm.GetWithPathFilter(tunnelID, pathFilter)
}
// getSessionProjectPath returns the project path from the connection's session.
func (d *Daemon) getSessionProjectPath(conn *hubpkg.Connection) string {
if sessionCode := conn.SessionCode(); sessionCode != "" {
if session, ok := d.sessionRegistry.Get(sessionCode); ok {
return session.ProjectPath
}
}
return ""
}
// RogueProcessInfo contains information about a detected rogue process.
type RogueProcessInfo struct {
Port int // The port being used
PIDs []int // PIDs of processes using the port
IsManaged bool // Whether any of the PIDs are managed by agnt
HasWarning bool // Whether to show warning
}
// detectRogueProcess checks if there's an unmanaged process using the port
// associated with a stopped/failed process. Returns info about the rogue process.
func (d *Daemon) detectRogueProcess(ctx context.Context, proc *goprocess.ManagedProcess) *RogueProcessInfo {
// Only check stopped/failed processes
state := proc.State()
if state != goprocess.StateStopped && state != goprocess.StateFailed {
return nil
}
// Try to determine the expected port
port := d.getExpectedPortForProcess(proc)
if port <= 0 {
return nil
}
// Check if port is in use using lsof
pids := findProcessesByPort(ctx, port)
if len(pids) == 0 {
return nil
}
// Check if any of these PIDs are managed by agnt
isManaged := false
for _, pid := range pids {
if d.hub.ProcessManager().IsManagedPID(pid) {
isManaged = true
break
}
}
// If port is in use by unmanaged process, return warning info
if !isManaged {
return &RogueProcessInfo{
Port: port,
PIDs: pids,
IsManaged: false,
HasWarning: true,
}
}
return nil
}
// findProcessesByPort finds PIDs of processes listening on the given port.
func findProcessesByPort(ctx context.Context, port int) []int {
// Try lsof first
cmd := exec.CommandContext(ctx, "lsof", "-ti", fmt.Sprintf(":%d", port))
output, err := cmd.Output()
if err == nil && len(output) > 0 {
return parsePIDLines(strings.TrimSpace(string(output)))
}
// Fall back to ss
cmd = exec.CommandContext(ctx, "ss", "-tlnp")
output, err = cmd.Output()
if err != nil {
return nil
}
var pids []int
lines := strings.Split(string(output), "\n")
portPattern := fmt.Sprintf(":%d", port)
for _, line := range lines {
if !strings.Contains(line, portPattern) {
continue
}
// Extract PID from "pid=XXXX," pattern
start := strings.Index(line, "pid=")
if start == -1 {
continue
}
start += 4
end := strings.IndexAny(line[start:], ",)")
if end == -1 {
continue
}
var pid int
if _, err := fmt.Sscanf(line[start:start+end], "%d", &pid); err == nil {
pids = append(pids, pid)
}
}
return pids
}
// parsePIDLines parses newline-separated PID output from lsof.
func parsePIDLines(output string) []int {
if output == "" {
return nil
}
lines := strings.Split(output, "\n")
var pids []int
for _, line := range lines {
line = strings.TrimSpace(line)
if line == "" {
continue
}
var pid int
if _, err := fmt.Sscanf(line, "%d", &pid); err == nil {
pids = append(pids, pid)
}
}
return pids
}
// getExpectedPortForProcess extracts the expected port for a process.
// Checks URLs from URL tracker, then falls back to command-line parsing.
func (d *Daemon) getExpectedPortForProcess(proc *goprocess.ManagedProcess) int {
// First, check URLs from URL tracker
urls := d.urlTracker.GetURLs(proc.ID)
for _, urlStr := range urls {
if port := extractPortFromURL(urlStr); port > 0 {
return port
}
}
// Fall back to command-line parsing
return extractPortFromCommand(proc.Command, proc.Args)
}
// extractPortFromURL extracts a port number from a URL string.
func extractPortFromURL(urlStr string) int {
// Simple pattern match for localhost URLs with ports
patterns := []string{
`localhost:(\d+)`,
`127\.0\.0\.1:(\d+)`,
`0\.0\.0\.0:(\d+)`,
`\[::1\]:(\d+)`,
}
for _, pattern := range patterns {
re := regexp.MustCompile(pattern)
if matches := re.FindStringSubmatch(urlStr); len(matches) > 1 {
if port, err := strconv.Atoi(matches[1]); err == nil && port > 0 && port < 65536 {
return port
}
}
}
return 0
}
// registerAgntCommands registers agnt-specific commands with the Hub.
// This enables Hub's command dispatch to route these commands to the daemon's handlers.
// Note: Registering a command that Hub already registered will override Hub's handler.
func (d *Daemon) registerAgntCommands() {
// PROC command - override Hub's to add URL tracking and project filtering
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "PROC",
SubVerbs: []string{"STATUS", "OUTPUT", "STOP", "RESTART", "LIST", "CLEANUP-PORT"},
Description: "Manage running processes",
Handler: d.hubHandleProc,
})
// DETECT command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "DETECT",
Description: "Detect project type and available scripts",
Handler: d.hubHandleDetect,
})
// PROXY command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "PROXY",
SubVerbs: []string{"START", "STOP", "RESTART", "STATUS", "LIST", "EXEC", "TOAST"},
Description: "Manage reverse proxies",
Handler: d.hubHandleProxy,
})
// PROXYLOG command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "PROXYLOG",
SubVerbs: []string{"QUERY", "SUMMARY", "CLEAR", "STATS"},
Description: "Query proxy traffic logs",
Handler: d.hubHandleProxyLog,
})
// CURRENTPAGE command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "CURRENTPAGE",
SubVerbs: []string{"LIST", "GET", "SUMMARY", "CLEAR"},
Description: "View active page sessions",
Handler: d.hubHandleCurrentPage,
})
// OVERLAY command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "OVERLAY",
SubVerbs: []string{"SET", "GET", "CLEAR", "ACTIVITY"},
Description: "Configure overlay endpoint",
Handler: d.hubHandleOverlay,
})
// TUNNEL command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "TUNNEL",
SubVerbs: []string{"START", "STOP", "STATUS", "LIST"},
Description: "Manage tunnel connections",
Handler: d.hubHandleTunnel,
})
// CHAOS command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "CHAOS",
SubVerbs: []string{"ENABLE", "DISABLE", "STATUS", "PRESET", "SET", "ADD-RULE", "REMOVE-RULE", "LIST-RULES", "STATS", "CLEAR", "LIST-PRESETS"},
Description: "Configure chaos engineering rules",
Handler: d.hubHandleChaos,
})
// SESSION command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "SESSION",
SubVerbs: []string{"REGISTER", "UNREGISTER", "HEARTBEAT", "LIST", "GET", "SEND", "SCHEDULE", "CANCEL", "TASKS", "FIND", "ATTACH", "URL"},
Description: "Manage client sessions",
Handler: d.hubHandleSession,
})
// STATUS command - returns full daemon info (Hub's INFO is minimal)
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "STATUS",
Description: "Get full daemon status and statistics",
Handler: d.hubHandleStatus,
})
// STORE command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "STORE",
SubVerbs: []string{"GET", "SET", "DELETE", "LIST", "CLEAR", "GET-ALL"},
Description: "Manage persistent key-value storage",
Handler: d.hubHandleStore,
})
// AUTOMATE command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "AUTOMATE",
SubVerbs: []string{"PROCESS", "BATCH"},
Description: "Process automation tasks using AI",
Handler: d.hubHandleAutomate,
})
// STOP-ALL command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "STOP-ALL",
Description: "Stop all running processes, proxies, and tunnels",
Handler: d.hubHandleStopAll,
})
// RESTART-ALL command
d.hub.RegisterCommand(hubpkg.CommandDefinition{
Verb: "RESTART-ALL",
Description: "Restart all processes and proxies using .agnt.kdl config",
Handler: d.hubHandleRestartAll,
})
log.Printf("[DEBUG] Registered %d agnt-specific commands with Hub", 14)
}
// hubHandleProc handles the PROC command (overrides Hub's built-in).
// Adds URL tracking and project-based filtering.
func (d *Daemon) hubHandleProc(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "PROC %s: args=%v", cmd.SubVerb, cmd.Args)
switch cmd.SubVerb {
case "STATUS":
return d.hubHandleProcStatus(ctx, conn, cmd)
case "OUTPUT":
return d.hubHandleProcOutput(ctx, conn, cmd)
case "STOP":
return d.hubHandleProcStop(ctx, conn, cmd)
case "RESTART":
return d.hubHandleProcRestart(ctx, conn, cmd)
case "LIST":
return d.hubHandleProcList(ctx, conn, cmd)
case "CLEANUP-PORT":
return d.hubHandleProcCleanupPort(ctx, conn, cmd)
case "":
return writeStructuredErr(conn, "daemon", &hubproto.StructuredError{
Code: hubproto.ErrMissingParam,
Message: "action required",
Command: "PROC",
Param: "action",
ValidActions: []string{"STATUS", "OUTPUT", "STOP", "RESTART", "LIST", "CLEANUP-PORT"},
})
default:
return writeStructuredErr(conn, "daemon", &hubproto.StructuredError{
Code: hubproto.ErrInvalidAction,
Message: "unknown action",
Command: "PROC",
Action: cmd.SubVerb,
ValidActions: []string{"STATUS", "OUTPUT", "STOP", "RESTART", "LIST", "CLEANUP-PORT"},
})
}
}
// hubHandleProcStatus handles PROC STATUS <id>.
func (d *Daemon) hubHandleProcStatus(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrMissingParam, "process_id required")
}
processID := cmd.Args[0]
proc, err := d.hub.ProcessManager().Get(processID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, fmt.Sprintf("process %q not found", processID))
}
resp := map[string]interface{}{
"id": proc.ID,
"command": proc.Command,
"args": proc.Args,
"state": proc.State().String(),
"summary": proc.Summary(),
"runtime": formatDuration(proc.Runtime()),
"runtime_ms": proc.Runtime().Milliseconds(),
"project_path": proc.ProjectPath,
}
if pid := proc.PID(); pid > 0 {
resp["pid"] = pid
}
if proc.State().String() == "stopped" || proc.State().String() == "failed" {
resp["exit_code"] = proc.ExitCode()
}
// Add URLs from URL tracker
if urls := d.urlTracker.GetURLs(processID); len(urls) > 0 {
resp["urls"] = urls
}
// Check for rogue process using the same port
if rogueInfo := d.detectRogueProcess(ctx, proc); rogueInfo != nil && rogueInfo.HasWarning {
resp["warning"] = fmt.Sprintf(
"Port %d is in use by unmanaged process (PID %v). "+
"Run 'proc {action: \"restart\", process_id: \"%s\"}' to kill the rogue process and restart properly.",
rogueInfo.Port, rogueInfo.PIDs, proc.ID)
resp["rogue_process"] = map[string]interface{}{
"port": rogueInfo.Port,
"pids": rogueInfo.PIDs,
}
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleProcOutput handles PROC OUTPUT <id> [filter].
func (d *Daemon) hubHandleProcOutput(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrMissingParam, "process_id required")
}
processID := cmd.Args[0]
proc, err := d.hub.ProcessManager().Get(processID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, fmt.Sprintf("process %q not found", processID))
}
// Parse optional filter from JSON data
var filter hubproto.OutputFilter
if len(cmd.Data) > 0 {
if err := json.Unmarshal(cmd.Data, &filter); err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, fmt.Sprintf("invalid filter JSON: %v", err))
}
}
var output []byte
switch filter.Stream {
case "stdout":
output, _ = proc.Stdout()
case "stderr":
output, _ = proc.Stderr()
default:
output, _ = proc.CombinedOutput()
}
// Apply filters
lines := strings.Split(string(output), "\n")
var filtered []string
for _, line := range lines {
if filter.Grep != "" {
match := strings.Contains(line, filter.Grep)
if filter.GrepV {
match = !match
}
if !match {
continue
}
}
filtered = append(filtered, line)
}
// Apply head/tail limits
if filter.Head > 0 && len(filtered) > filter.Head {
filtered = filtered[:filter.Head]
}
if filter.Tail > 0 && len(filtered) > filter.Tail {
filtered = filtered[len(filtered)-filter.Tail:]
}
// Return output as chunked response (client expects CHUNK + END for .String())
outputStr := strings.Join(filtered, "\n")
if err := conn.WriteChunk([]byte(outputStr)); err != nil {
return err
}
return conn.WriteEnd()
}
// hubHandleProcStop handles PROC STOP <id>.
func (d *Daemon) hubHandleProcStop(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrMissingParam, "process_id required")
}
processID := cmd.Args[0]
proc, err := d.hub.ProcessManager().Get(processID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, fmt.Sprintf("process %q not found", processID))
}
if !proc.IsRunning() {
resp := map[string]interface{}{
"process_id": processID,
"state": proc.State().String(),
"success": true,
"message": fmt.Sprintf("process %q already stopped", processID),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
if err := d.hub.ProcessManager().Stop(ctx, processID); err != nil {
return conn.WriteErr(hubproto.ErrInternal, fmt.Sprintf("failed to stop: %v", err))
}
resp := map[string]interface{}{
"process_id": processID,
"state": "stopped",
"success": true,
"message": fmt.Sprintf("process %q stopped", processID),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleProcList handles PROC LIST [filter].
func (d *Daemon) hubHandleProcList(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
procs := d.hub.ProcessManager().List()
// Parse directory filter from JSON data (optional)
var dirFilter hubproto.DirectoryFilter
if len(cmd.Data) > 0 {
if err := json.Unmarshal(cmd.Data, &dirFilter); err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, fmt.Sprintf("invalid filter JSON: %v", err))
}
}
// Resolve the project path for filtering
var projectPath string
var sessionCode string
filteredProcs := procs
if dirFilter.Global {
// No filtering - return all processes
} else if dirFilter.SessionCode != "" {
sessionCode = dirFilter.SessionCode
session, ok := d.sessionRegistry.Get(sessionCode)
if !ok {
return conn.WriteErr(hubproto.ErrNotFound, fmt.Sprintf("session %q not found", sessionCode))
}
projectPath = session.ProjectPath
} else if dirFilter.Directory != "" {
projectPath = dirFilter.Directory
} else if connSession := conn.SessionCode(); connSession != "" {
sessionCode = connSession
session, ok := d.sessionRegistry.Get(sessionCode)
if ok {
projectPath = session.ProjectPath
}
}
// Filter processes by project path
if !dirFilter.Global && projectPath != "" {
normalizedDir := normalizePath(projectPath)
var filtered []*goprocess.ManagedProcess
for _, p := range procs {
if normalizePath(p.ProjectPath) == normalizedDir {
filtered = append(filtered, p)
}
}
filteredProcs = filtered
}
entries := make([]map[string]interface{}, len(filteredProcs))
var warnings []string
for i, p := range filteredProcs {
entry := map[string]interface{}{
"id": p.ID,
"command": p.Command,
"state": p.State().String(),
"summary": p.Summary(),
"runtime": formatDuration(p.Runtime()),
"runtime_ms": p.Runtime().Milliseconds(),
"project_path": p.ProjectPath,
}
// Add URLs from URL tracker
if urls := d.urlTracker.GetURLs(p.ID); len(urls) > 0 {
entry["urls"] = urls
}
// Check for rogue process using the same port
if rogueInfo := d.detectRogueProcess(ctx, p); rogueInfo != nil && rogueInfo.HasWarning {
warning := fmt.Sprintf(
"Process %q shows as %s but port %d is in use by unmanaged process (PID %v). "+
"Run 'proc {action: \"restart\", process_id: \"%s\"}' to kill the rogue process and restart properly.",
p.ID, p.State().String(), rogueInfo.Port, rogueInfo.PIDs, p.ID)
entry["warning"] = warning
entry["rogue_process"] = map[string]interface{}{
"port": rogueInfo.Port,
"pids": rogueInfo.PIDs,
}
warnings = append(warnings, warning)
}
entries[i] = entry
}
resp := map[string]interface{}{
"count": len(filteredProcs),
"processes": entries,
"global": dirFilter.Global,
"total_in_daemon": len(procs),
}
if projectPath != "" {
resp["project_path"] = normalizePath(projectPath)
}
if sessionCode != "" {
resp["session_code"] = sessionCode
}
if len(warnings) > 0 {
resp["warnings"] = warnings
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleProcCleanupPort handles PROC CLEANUP-PORT <port>.
func (d *Daemon) hubHandleProcCleanupPort(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrMissingParam, "port required")
}
port, err := strconv.Atoi(cmd.Args[0])
if err != nil || port <= 0 || port > 65535 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid port number")
}
pids, err := d.hub.ProcessManager().KillProcessByPort(ctx, port)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
resp := map[string]interface{}{
"port": port,
"killed_count": len(pids),
"killed_pids": pids,
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// formatDuration formats a duration for display.
func formatDuration(d time.Duration) string {
if d < time.Second {
return fmt.Sprintf("%dms", d.Milliseconds())
}
if d < time.Minute {
return fmt.Sprintf("%.1fs", d.Seconds())
}
if d < time.Hour {
m := int(d.Minutes())
s := int(d.Seconds()) % 60
return fmt.Sprintf("%dm%ds", m, s)
}
h := int(d.Hours())
m := int(d.Minutes()) % 60
return fmt.Sprintf("%dh%dm", h, m)
}
// hubHandleDetect handles the DETECT command.
func (d *Daemon) hubHandleDetect(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "DETECT: args=%v", cmd.Args)
path := "."
if len(cmd.Args) > 0 {
path = cmd.Args[0]
}
proj, err := project.Detect(path)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
resp := map[string]interface{}{
"type": proj.Type,
"path": proj.Path,
"package_manager": proj.PackageManager,
"scripts": project.GetCommandNames(proj),
}
data, err := json.Marshal(resp)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
return conn.WriteJSON(data)
}
// hubHandleProxy handles the PROXY command and its sub-verbs.
func (d *Daemon) hubHandleProxy(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "PROXY %s: args=%v", cmd.SubVerb, cmd.Args)
switch cmd.SubVerb {
case "START":
return d.hubHandleProxyStart(ctx, conn, cmd)
case "STOP":
return d.hubHandleProxyStop(ctx, conn, cmd)
case "RESTART":
return d.hubHandleProxyRestart(ctx, conn, cmd)
case "STATUS":
return d.hubHandleProxyStatus(conn, cmd)
case "LIST":
return d.hubHandleProxyList(conn, cmd)
case "EXEC":
return d.hubHandleProxyExec(conn, cmd)
case "TOAST":
return d.hubHandleProxyToast(conn, cmd)
default:
return writeStructuredErr(conn, "daemon", &hubproto.StructuredError{
Code: hubproto.ErrInvalidArgs,
Message: "unknown PROXY sub-command",
Command: "PROXY",
ValidActions: []string{"START", "STOP", "RESTART", "STATUS", "LIST", "EXEC", "TOAST"},
})
}
}
// hubHandleProxyStart handles PROXY START command.
// PROXY START <id> <target_url> <port> [max_log_size]
func (d *Daemon) hubHandleProxyStart(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 3 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXY START requires: <id> <target_url> <port>")
}
proxyID := cmd.Args[0]
targetURL := cmd.Args[1]
port, err := strconv.Atoi(cmd.Args[2])
if err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid port")
}
maxLogSize := 1000
if len(cmd.Args) > 3 {
maxLogSize, _ = strconv.Atoi(cmd.Args[3])
}
// Parse extended config from JSON data (optional)
path := "."
bindAddress := ""
publicURL := ""
verifyTLS := false
if len(cmd.Data) > 0 {
var data struct {
Path string `json:"path"`
BindAddress string `json:"bind_address"`
PublicURL string `json:"public_url"`
VerifyTLS bool `json:"verify_tls"`
}
if err := json.Unmarshal(cmd.Data, &data); err == nil {
if data.Path != "" {
path = data.Path
}
bindAddress = data.BindAddress
publicURL = data.PublicURL
verifyTLS = data.VerifyTLS
}
}
// Create proxy config
proxyConfig := proxy.ProxyConfig{
ID: proxyID,
TargetURL: targetURL,
ListenPort: port,
MaxLogSize: maxLogSize,
AutoRestart: true,
Path: normalizePath(path),
BindAddress: bindAddress,
PublicURL: publicURL,
VerifyTLS: verifyTLS,
}
proxyServer, err := d.proxym.Create(ctx, proxyConfig)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
// Find session for this project to get session-specific overlay endpoint
if path != "" {
if session, ok := d.sessionRegistry.FindByDirectory(normalizePath(path)); ok && session.OverlayPath != "" {
proxyServer.SetOverlayEndpoint(session.OverlayPath)
log.Printf("[DEBUG] Set session-specific overlay endpoint for proxy %s: %s", proxyID, session.OverlayPath)
} else if endpoint := d.OverlayEndpoint(); endpoint != "" {
// Fallback to global overlay endpoint if no session found
proxyServer.SetOverlayEndpoint(endpoint)
log.Printf("[DEBUG] Set global overlay endpoint for proxy %s: %s", proxyID, endpoint)
}
} else if endpoint := d.OverlayEndpoint(); endpoint != "" {
// Fallback to global overlay endpoint if no path specified
proxyServer.SetOverlayEndpoint(endpoint)
log.Printf("[DEBUG] Set global overlay endpoint for proxy %s: %s", proxyID, endpoint)
}
// Persist proxy config
if d.stateMgr != nil {
d.stateMgr.AddProxy(PersistentProxyConfig{
ID: proxyID,
TargetURL: targetURL,
Port: port,
MaxLogSize: maxLogSize,
Path: path,
})
}
resp := map[string]interface{}{
"id": proxyServer.ID,
"listen_addr": proxyServer.ListenAddr,
"target_url": proxyServer.TargetURL.String(),
"status": "running",
}
if proxyServer.BindAddress != "" {
resp["bind_address"] = proxyServer.BindAddress
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleProxyStop handles PROXY STOP command.
func (d *Daemon) hubHandleProxyStop(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXY STOP requires: <id>")
}
proxyID := cmd.Args[0]
// Use session-scoped lookup to resolve the proxy
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
// Stop using the resolved full ID
if err := d.proxym.Stop(ctx, p.ID); err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
// Remove from persisted state
if d.stateMgr != nil {
d.stateMgr.RemoveProxy(p.ID)
}
return conn.WriteOK("proxy stopped")
}
// hubHandleProxyStatus handles PROXY STATUS command.
func (d *Daemon) hubHandleProxyStatus(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXY STATUS requires: <id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
resp := map[string]interface{}{
"id": p.ID,
"listen_addr": p.ListenAddr,
"target_url": p.TargetURL.String(),
"status": "running",
"stats": p.Stats(),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleProxyList handles PROXY LIST command.
func (d *Daemon) hubHandleProxyList(conn *hubpkg.Connection, cmd *hubproto.Command) error {
// Parse filter from command data
var dirFilter hubproto.DirectoryFilter
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &dirFilter)
}
// Resolve filter path from session code or directory
filterPath := ""
if !dirFilter.Global {
if dirFilter.SessionCode != "" {
// Look up session to get project path
if session, ok := d.sessionRegistry.Get(dirFilter.SessionCode); ok {
filterPath = normalizePath(session.ProjectPath)
}
} else if dirFilter.Directory != "" {
filterPath = normalizePath(dirFilter.Directory)
}
}
proxies := d.proxym.List()
var result []map[string]interface{}
for _, p := range proxies {
proxyPath := normalizePath(p.Path)
// Filter by path if not global and we have a filter path
if !dirFilter.Global && filterPath != "" && filterPath != "." && proxyPath != filterPath {
continue
}
result = append(result, map[string]interface{}{
"id": p.ID,
"listen_addr": p.ListenAddr,
"target_url": p.TargetURL.String(),
"status": "running",
"running": true,
"path": p.Path,
})
}
data, _ := json.Marshal(map[string]interface{}{
"proxies": result,
"count": len(result),
})
return conn.WriteJSON(data)
}
// hubHandleProxyExec handles PROXY EXEC command.
func (d *Daemon) hubHandleProxyExec(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXY EXEC requires: <id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
// Code is in the data payload
if len(cmd.Data) == 0 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXY EXEC requires code")
}
code := string(cmd.Data)
execID, resultChan, err := p.ExecuteJavaScript(code)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
// Wait for result with timeout
timeout := 30 * time.Second
select {
case result := <-resultChan:
if result == nil {
return conn.WriteErr(hubproto.ErrInternal, "execution channel closed")
}
resp := map[string]interface{}{
"execution_id": execID,
"success": result.Error == "",
"result": result.Result,
"error": result.Error,
"duration": result.Duration.String(),
}
// Include file path for large results
if result.FilePath != "" {
resp["file_path"] = result.FilePath
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
case <-time.After(timeout):
return conn.WriteErr(hubproto.ErrTimeout, "execution timed out")
}
}
// hubHandleProxyToast handles PROXY TOAST command.
func (d *Daemon) hubHandleProxyToast(conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "PROXY TOAST: args=%v dataLen=%d", cmd.Args, len(cmd.Data))
if len(cmd.Args) < 1 {
debug.Log("daemon", "PROXY TOAST: missing proxy ID")
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXY TOAST requires: <id>")
}
proxyID := cmd.Args[0]
debug.Log("daemon", "PROXY TOAST: proxyID=%s", proxyID)
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
debug.Log("daemon", "PROXY TOAST: proxy not found: %v", err)
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
debug.Log("daemon", "PROXY TOAST: found proxy %s at %s", p.ID, p.ListenAddr)
// Toast config is in the data payload
if len(cmd.Data) == 0 {
debug.Log("daemon", "PROXY TOAST: no data payload")
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXY TOAST requires toast config")
}
var toast struct {
Message string `json:"toast_message"`
Type string `json:"toast_type"`
Title string `json:"toast_title"`
Duration int `json:"toast_duration"`
}
if err := json.Unmarshal(cmd.Data, &toast); err != nil {
debug.Log("daemon", "PROXY TOAST: failed to unmarshal: %v", err)
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid toast config: "+err.Error())
}
if toast.Type == "" {
toast.Type = "info"
}
if toast.Message == "" {
debug.Log("daemon", "PROXY TOAST: empty message")
return conn.WriteErr(hubproto.ErrInvalidArgs, "toast_message is required")
}
debug.Log("daemon", "PROXY TOAST: sending type=%s title=%q message=%q", toast.Type, toast.Title, toast.Message)
sentCount, err := p.BroadcastToast(toast.Type, toast.Title, toast.Message, toast.Duration)
if err != nil {
debug.Log("daemon", "PROXY TOAST: broadcast error: %v", err)
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
debug.Log("daemon", "PROXY TOAST: sent to %d clients", sentCount)
resp := map[string]interface{}{
"success": true,
"sent_count": sentCount,
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleProxyLog handles the PROXYLOG command and its sub-verbs.
func (d *Daemon) hubHandleProxyLog(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "PROXYLOG %s: args=%v", cmd.SubVerb, cmd.Args)
switch cmd.SubVerb {
case "QUERY", "":
return d.hubHandleProxyLogQuery(conn, cmd)
case "SUMMARY":
return d.hubHandleProxyLogSummary(conn, cmd)
case "CLEAR":
return d.hubHandleProxyLogClear(conn, cmd)
case "STATS":
return d.hubHandleProxyLogStats(conn, cmd)
default:
return writeStructuredErr(conn, "daemon", &hubproto.StructuredError{
Code: hubproto.ErrInvalidArgs,
Message: "unknown PROXYLOG sub-command",
Command: "PROXYLOG",
ValidActions: []string{"QUERY", "SUMMARY", "CLEAR", "STATS"},
})
}
}
// hubHandleProxyLogQuery handles PROXYLOG QUERY command.
func (d *Daemon) hubHandleProxyLogQuery(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXYLOG requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
var filter proxy.LogFilter
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &filter)
}
entries := p.Logger().Query(filter)
data, _ := json.Marshal(map[string]interface{}{"logs": entries})
return conn.WriteJSON(data)
}
// hubHandleProxyLogSummary handles PROXYLOG SUMMARY command.
func (d *Daemon) hubHandleProxyLogSummary(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXYLOG SUMMARY requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
// For summary, we return stats plus recent entries
stats := p.Logger().Stats()
resp := map[string]interface{}{
"stats": stats,
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleProxyLogClear handles PROXYLOG CLEAR command.
func (d *Daemon) hubHandleProxyLogClear(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXYLOG CLEAR requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
p.Logger().Clear()
return conn.WriteOK("logs cleared")
}
// hubHandleProxyLogStats handles PROXYLOG STATS command.
func (d *Daemon) hubHandleProxyLogStats(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXYLOG STATS requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
stats := p.Logger().Stats()
data, _ := json.Marshal(stats)
return conn.WriteJSON(data)
}
// hubHandleCurrentPage handles the CURRENTPAGE command.
func (d *Daemon) hubHandleCurrentPage(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "CURRENTPAGE %s: args=%v", cmd.SubVerb, cmd.Args)
switch cmd.SubVerb {
case "LIST", "":
return d.hubHandleCurrentPageList(conn, cmd)
case "GET":
return d.hubHandleCurrentPageGet(conn, cmd)
case "SUMMARY":
return d.hubHandleCurrentPageSummary(conn, cmd)
case "CLEAR":
return d.hubHandleCurrentPageClear(conn, cmd)
default:
return conn.WriteStructuredErr(&hubproto.StructuredError{
Code: hubproto.ErrInvalidArgs,
Message: "unknown CURRENTPAGE sub-command",
Command: "CURRENTPAGE",
ValidActions: []string{"LIST", "GET", "SUMMARY", "CLEAR"},
})
}
}
// hubHandleCurrentPageList handles CURRENTPAGE LIST command.
func (d *Daemon) hubHandleCurrentPageList(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CURRENTPAGE LIST requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
// Return lightweight summaries instead of full sessions with massive arrays
summaries := p.PageTracker().GetActiveSessionSummaries()
resp := map[string]interface{}{
"sessions": summaries,
"count": len(summaries),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleCurrentPageGet handles CURRENTPAGE GET command.
func (d *Daemon) hubHandleCurrentPageGet(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 2 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CURRENTPAGE GET requires: <proxy_id> <session_id>")
}
proxyID := cmd.Args[0]
sessionID := cmd.Args[1]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
session, ok := p.PageTracker().GetSession(sessionID)
if !ok {
return conn.WriteErr(hubproto.ErrNotFound, "session not found")
}
data, _ := json.Marshal(session)
return conn.WriteJSON(data)
}
// hubHandleCurrentPageSummary handles CURRENTPAGE SUMMARY command.
func (d *Daemon) hubHandleCurrentPageSummary(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 2 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CURRENTPAGE SUMMARY requires: <proxy_id> <session_id>")
}
proxyID := cmd.Args[0]
sessionID := cmd.Args[1]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
session, ok := p.PageTracker().GetSession(sessionID)
if !ok {
return conn.WriteErr(hubproto.ErrNotFound, "session not found")
}
// Return a summary of the session
data, _ := json.Marshal(session)
return conn.WriteJSON(data)
}
// hubHandleCurrentPageClear handles CURRENTPAGE CLEAR command.
func (d *Daemon) hubHandleCurrentPageClear(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CURRENTPAGE CLEAR requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
p.PageTracker().Clear()
return conn.WriteOK("page sessions cleared")
}
// hubHandleOverlay handles the OVERLAY command.
func (d *Daemon) hubHandleOverlay(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "OVERLAY %s: args=%v", cmd.SubVerb, cmd.Args)
switch cmd.SubVerb {
case "SET":
return d.hubHandleOverlaySet(conn, cmd)
case "GET", "":
return d.hubHandleOverlayGet(conn)
case "CLEAR":
return d.hubHandleOverlayClear(conn)
case "ACTIVITY":
return d.hubHandleOverlayActivity(conn, cmd)
case "OUTPUT-PREVIEW":
return d.hubHandleOverlayOutputPreview(conn, cmd)
default:
return conn.WriteStructuredErr(&hubproto.StructuredError{
Code: hubproto.ErrInvalidArgs,
Message: "unknown OVERLAY sub-command",
Command: "OVERLAY",
ValidActions: []string{"SET", "GET", "CLEAR", "ACTIVITY", "OUTPUT-PREVIEW"},
})
}
}
// hubHandleOverlaySet handles OVERLAY SET command.
func (d *Daemon) hubHandleOverlaySet(conn *hubpkg.Connection, cmd *hubproto.Command) error {
var config struct {
Endpoint string `json:"endpoint"`
}
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &config)
}
if config.Endpoint == "" {
return conn.WriteErr(hubproto.ErrInvalidArgs, "endpoint is required")
}
d.SetOverlayEndpoint(config.Endpoint)
return conn.WriteOK("overlay endpoint set")
}
// hubHandleOverlayGet handles OVERLAY GET command.
func (d *Daemon) hubHandleOverlayGet(conn *hubpkg.Connection) error {
endpoint := d.OverlayEndpoint()
resp := map[string]interface{}{
"endpoint": endpoint,
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleOverlayClear handles OVERLAY CLEAR command.
func (d *Daemon) hubHandleOverlayClear(conn *hubpkg.Connection) error {
d.SetOverlayEndpoint("")
return conn.WriteOK("overlay endpoint cleared")
}
// hubHandleOverlayActivity handles OVERLAY ACTIVITY command.
// Args: <active:true/false> [proxyID1 proxyID2 ...]
// Broadcasts activity state to specified proxies (or all proxies if none specified).
func (d *Daemon) hubHandleOverlayActivity(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "OVERLAY ACTIVITY requires: <active:true/false> [proxyIDs...]")
}
// Parse active state
active := cmd.Args[0] == "true"
// Get proxy IDs (if specified)
proxyIDs := cmd.Args[1:]
// Broadcast to specified proxies or all proxies
var proxiesToBroadcast []*proxy.ProxyServer
if len(proxyIDs) > 0 {
// Broadcast to specific proxies
for _, proxyID := range proxyIDs {
p, err := d.proxym.Get(proxyID)
if err != nil {
log.Printf("[WARN] Proxy %s not found for activity broadcast: %v", proxyID, err)
continue
}
proxiesToBroadcast = append(proxiesToBroadcast, p)
}
} else {
// Broadcast to all proxies
proxiesToBroadcast = d.proxym.List()
}
// Broadcast activity state to each proxy
totalSent := 0
for _, p := range proxiesToBroadcast {
sentCount := p.BroadcastActivityState(active)
totalSent += sentCount
}
data, _ := json.Marshal(map[string]interface{}{
"status": "ok",
"active": active,
"proxies": len(proxiesToBroadcast),
"clients_sent": totalSent,
})
return conn.WriteJSON(data)
}
// hubHandleOverlayOutputPreview handles OVERLAY OUTPUT-PREVIEW command.
// Broadcasts output preview lines to connected browsers via proxies.
func (d *Daemon) hubHandleOverlayOutputPreview(conn *hubpkg.Connection, cmd *hubproto.Command) error {
var payload struct {
Lines []string `json:"lines"`
ProxyIDs []string `json:"proxy_ids"`
}
if len(cmd.Data) > 0 {
if err := json.Unmarshal(cmd.Data, &payload); err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid payload")
}
}
if len(payload.Lines) == 0 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "lines required")
}
// Get proxies to broadcast to
var proxiesToBroadcast []*proxy.ProxyServer
if len(payload.ProxyIDs) > 0 {
for _, proxyID := range payload.ProxyIDs {
p, err := d.proxym.Get(proxyID)
if err != nil {
continue
}
proxiesToBroadcast = append(proxiesToBroadcast, p)
}
} else {
proxiesToBroadcast = d.proxym.List()
}
// Broadcast to each proxy
totalSent := 0
for _, p := range proxiesToBroadcast {
sentCount := p.BroadcastOutputPreview(payload.Lines)
totalSent += sentCount
}
data, _ := json.Marshal(map[string]interface{}{
"status": "ok",
"lines": len(payload.Lines),
"proxies": len(proxiesToBroadcast),
"clients_sent": totalSent,
})
return conn.WriteJSON(data)
}
// hubHandleTunnel handles the TUNNEL command.
func (d *Daemon) hubHandleTunnel(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "TUNNEL %s: args=%v", cmd.SubVerb, cmd.Args)
switch cmd.SubVerb {
case "START":
return d.hubHandleTunnelStart(ctx, conn, cmd)
case "STOP":
return d.hubHandleTunnelStop(ctx, conn, cmd)
case "STATUS":
return d.hubHandleTunnelStatus(conn, cmd)
case "LIST":
return d.hubHandleTunnelList(conn, cmd)
default:
return conn.WriteStructuredErr(&hubproto.StructuredError{
Code: hubproto.ErrInvalidArgs,
Message: "unknown TUNNEL sub-command",
Command: "TUNNEL",
ValidActions: []string{"START", "STOP", "STATUS", "LIST"},
})
}
}
// hubHandleTunnelStart handles TUNNEL START command.
func (d *Daemon) hubHandleTunnelStart(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "TUNNEL START requires: <id>")
}
tunnelID := cmd.Args[0]
var config struct {
Provider string `json:"provider"`
LocalPort int `json:"local_port"`
LocalHost string `json:"local_host"`
ProxyID string `json:"proxy_id"`
BinaryPath string `json:"binary_path"`
}
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &config)
}
if config.Provider == "" {
return conn.WriteErr(hubproto.ErrInvalidArgs, "provider is required")
}
if config.LocalPort == 0 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "local_port is required")
}
// Get project path from session for session scoping
projectPath := d.getSessionProjectPath(conn)
tunnelConfig := tunnel.Config{
Provider: tunnel.Provider(config.Provider),
LocalPort: config.LocalPort,
LocalHost: config.LocalHost,
BinaryPath: config.BinaryPath,
Path: projectPath,
}
t, err := d.tunnelm.Start(ctx, tunnelID, tunnelConfig)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
// Wait for public URL
publicURL, err := t.WaitForURL(ctx)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, fmt.Sprintf("tunnel started but failed to get URL: %v", err))
}
// Update proxy public URL if proxy_id specified
if config.ProxyID != "" {
if p, err := d.getSessionScopedProxy(conn, config.ProxyID); err == nil {
p.SetPublicURL(publicURL)
}
}
resp := map[string]interface{}{
"id": tunnelID,
"provider": config.Provider,
"local_port": config.LocalPort,
"public_url": publicURL,
"status": "running",
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleTunnelStop handles TUNNEL STOP command.
func (d *Daemon) hubHandleTunnelStop(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "TUNNEL STOP requires: <id>")
}
tunnelID := cmd.Args[0]
// Use session-scoped lookup to find the tunnel
t, err := d.getSessionScopedTunnel(conn, tunnelID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
// Stop using the resolved full ID
if err := d.tunnelm.Stop(ctx, t.ID()); err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
return conn.WriteOK("tunnel stopped")
}
// hubHandleTunnelStatus handles TUNNEL STATUS command.
func (d *Daemon) hubHandleTunnelStatus(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "TUNNEL STATUS requires: <id>")
}
tunnelID := cmd.Args[0]
// Use session-scoped lookup to find the tunnel
t, err := d.getSessionScopedTunnel(conn, tunnelID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
info := t.Info()
resp := map[string]interface{}{
"id": info.ID,
"provider": string(info.Provider),
"state": info.State,
"public_url": info.PublicURL,
"local_addr": info.LocalAddr,
"path": info.Path,
}
if info.Error != "" {
resp["error"] = info.Error
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleTunnelList handles TUNNEL LIST command.
func (d *Daemon) hubHandleTunnelList(conn *hubpkg.Connection, cmd *hubproto.Command) error {
// Parse filter from command data
var dirFilter hubproto.DirectoryFilter
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &dirFilter)
}
var infos []tunnel.TunnelInfo
if dirFilter.Global {
// Global: list all tunnels
infos = d.tunnelm.List()
} else {
// Session-scoped: filter by project path
projectPath := d.getSessionProjectPath(conn)
if projectPath != "" {
infos = d.tunnelm.ListByPath(projectPath)
} else {
// No session, return all (fallback for non-session connections)
infos = d.tunnelm.List()
}
}
entries := make([]map[string]interface{}, len(infos))
for i, info := range infos {
entry := map[string]interface{}{
"id": info.ID,
"provider": string(info.Provider),
"state": info.State,
"public_url": info.PublicURL,
"local_addr": info.LocalAddr,
"path": info.Path,
}
if info.Error != "" {
entry["error"] = info.Error
}
entries[i] = entry
}
data, _ := json.Marshal(map[string]interface{}{"tunnels": entries})
return conn.WriteJSON(data)
}
// hubHandleChaos handles the CHAOS command.
func (d *Daemon) hubHandleChaos(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "CHAOS %s: args=%v", cmd.SubVerb, cmd.Args)
switch cmd.SubVerb {
case "ENABLE":
return d.hubHandleChaosEnable(conn, cmd)
case "DISABLE":
return d.hubHandleChaosDisable(conn, cmd)
case "STATUS":
return d.hubHandleChaosStatus(conn, cmd)
case "PRESET":
return d.hubHandleChaosPreset(conn, cmd)
case "SET":
return d.hubHandleChaosSet(conn, cmd)
case "ADD-RULE":
return d.hubHandleChaosAddRule(conn, cmd)
case "REMOVE-RULE":
return d.hubHandleChaosRemoveRule(conn, cmd)
case "LIST-RULES":
return d.hubHandleChaosListRules(conn, cmd)
case "STATS":
return d.hubHandleChaosStats(conn, cmd)
case "CLEAR":
return d.hubHandleChaosClear(conn, cmd)
case "LIST-PRESETS":
return d.hubHandleChaosListPresets(conn)
default:
return conn.WriteStructuredErr(&hubproto.StructuredError{
Code: hubproto.ErrInvalidArgs,
Message: "unknown CHAOS sub-command",
Command: "CHAOS",
ValidActions: []string{"ENABLE", "DISABLE", "STATUS", "PRESET", "SET", "ADD-RULE", "REMOVE-RULE", "LIST-RULES", "STATS", "CLEAR", "LIST-PRESETS"},
})
}
}
// hubHandleChaosEnable handles CHAOS ENABLE command.
func (d *Daemon) hubHandleChaosEnable(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CHAOS ENABLE requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
p.ChaosEngine().Enable()
return conn.WriteOK("chaos enabled")
}
// hubHandleChaosDisable handles CHAOS DISABLE command.
func (d *Daemon) hubHandleChaosDisable(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CHAOS DISABLE requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
p.ChaosEngine().Disable()
return conn.WriteOK("chaos disabled")
}
// hubHandleChaosStatus handles CHAOS STATUS command.
func (d *Daemon) hubHandleChaosStatus(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CHAOS STATUS requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
engine := p.ChaosEngine()
config := engine.GetConfig()
resp := map[string]interface{}{
"enabled": engine.IsEnabled(),
"config": config,
"stats": engine.GetStats(),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleChaosPreset handles CHAOS PRESET command.
func (d *Daemon) hubHandleChaosPreset(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CHAOS PRESET requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
var config struct {
Preset string `json:"chaos_preset"`
}
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &config)
}
if config.Preset == "" {
return conn.WriteErr(hubproto.ErrInvalidArgs, "chaos_preset is required")
}
presetConfig := proxy.GetPreset(config.Preset)
if presetConfig == nil {
availablePresets := proxy.ListPresets()
return conn.WriteErr(hubproto.ErrInvalidArgs, fmt.Sprintf("unknown preset %q. Available: %s", config.Preset, strings.Join(availablePresets, ", ")))
}
if err := p.ChaosEngine().SetConfig(presetConfig); err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
return conn.WriteOK(fmt.Sprintf("preset %s applied", config.Preset))
}
// hubHandleChaosSet handles CHAOS SET command.
func (d *Daemon) hubHandleChaosSet(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CHAOS SET requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
var config proxy.ChaosConfig
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &config)
}
if err := p.ChaosEngine().SetConfig(&config); err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
return conn.WriteOK("chaos config set")
}
// hubHandleChaosAddRule handles CHAOS ADD-RULE command.
func (d *Daemon) hubHandleChaosAddRule(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CHAOS ADD-RULE requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
var wrapper struct {
Rule proxy.ChaosRule `json:"chaos_rule"`
}
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &wrapper)
}
if wrapper.Rule.ID == "" {
return conn.WriteErr(hubproto.ErrInvalidArgs, "rule id is required")
}
if err := p.ChaosEngine().AddRule(&wrapper.Rule); err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
return conn.WriteOK("rule added")
}
// hubHandleChaosRemoveRule handles CHAOS REMOVE-RULE command.
func (d *Daemon) hubHandleChaosRemoveRule(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CHAOS REMOVE-RULE requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
var config struct {
RuleID string `json:"chaos_rule_id"`
}
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &config)
}
if config.RuleID == "" {
return conn.WriteErr(hubproto.ErrInvalidArgs, "chaos_rule_id is required")
}
p.ChaosEngine().RemoveRule(config.RuleID)
return conn.WriteOK("rule removed")
}
// hubHandleChaosListRules handles CHAOS LIST-RULES command.
func (d *Daemon) hubHandleChaosListRules(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CHAOS LIST-RULES requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
config := p.ChaosEngine().GetConfig()
var rules []*proxy.ChaosRule
if config != nil {
rules = config.Rules
}
if rules == nil {
rules = []*proxy.ChaosRule{}
}
data, _ := json.Marshal(map[string]interface{}{"rules": rules})
return conn.WriteJSON(data)
}
// hubHandleChaosStats handles CHAOS STATS command.
func (d *Daemon) hubHandleChaosStats(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CHAOS STATS requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
stats := p.ChaosEngine().GetStats()
data, _ := json.Marshal(stats)
return conn.WriteJSON(data)
}
// hubHandleChaosClear handles CHAOS CLEAR command.
func (d *Daemon) hubHandleChaosClear(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "CHAOS CLEAR requires: <proxy_id>")
}
proxyID := cmd.Args[0]
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
p.ChaosEngine().Clear()
return conn.WriteOK("chaos cleared")
}
// hubHandleChaosListPresets handles CHAOS LIST-PRESETS command.
func (d *Daemon) hubHandleChaosListPresets(conn *hubpkg.Connection) error {
presets := proxy.ListPresets()
data, _ := json.Marshal(map[string]interface{}{"presets": presets})
return conn.WriteJSON(data)
}
// hubHandleSession handles the SESSION command.
func (d *Daemon) hubHandleSession(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "SESSION %s: args=%v", cmd.SubVerb, cmd.Args)
switch cmd.SubVerb {
case "REGISTER":
return d.hubHandleSessionRegister(conn, cmd)
case "UNREGISTER":
return d.hubHandleSessionUnregister(conn, cmd)
case "HEARTBEAT":
return d.hubHandleSessionHeartbeat(conn, cmd)
case "LIST":
return d.hubHandleSessionList(conn, cmd)
case "GET":
return d.hubHandleSessionGet(conn, cmd)
case "SEND":
return d.hubHandleSessionSend(conn, cmd)
case "SCHEDULE":
return d.hubHandleSessionSchedule(conn, cmd)
case "CANCEL":
return d.hubHandleSessionCancel(conn, cmd)
case "TASKS":
return d.hubHandleSessionTasks(conn, cmd)
case "FIND":
return d.hubHandleSessionFind(conn, cmd)
case "ATTACH":
return d.hubHandleSessionAttach(conn, cmd)
case "URL":
return d.hubHandleSessionURL(conn, cmd)
default:
return conn.WriteStructuredErr(&hubproto.StructuredError{
Code: hubproto.ErrInvalidArgs,
Message: "unknown SESSION sub-command",
Command: "SESSION",
ValidActions: []string{"REGISTER", "UNREGISTER", "HEARTBEAT", "LIST", "GET", "SEND", "SCHEDULE", "CANCEL", "TASKS", "FIND", "ATTACH", "URL"},
})
}
}
// hubHandleSessionRegister handles SESSION REGISTER command.
// SESSION REGISTER <code> <overlay_path> -- <json_metadata>
func (d *Daemon) hubHandleSessionRegister(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 2 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION REGISTER requires: <code> <overlay_path>")
}
code := cmd.Args[0]
overlayPath := cmd.Args[1]
// Parse optional metadata from data payload
var metadata struct {
ProjectPath string `json:"project_path"`
Command string `json:"command"`
Args []string `json:"args"`
}
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &metadata)
}
// Create session
session := &Session{
Code: code,
OverlayPath: overlayPath,
ProjectPath: normalizePath(metadata.ProjectPath),
Command: metadata.Command,
Args: metadata.Args,
StartedAt: time.Now(),
Status: SessionStatusActive,
LastSeen: time.Now(),
}
if err := d.sessionRegistry.Register(session); err != nil {
return conn.WriteErr(hubproto.ErrAlreadyExists, err.Error())
}
// Associate session with this connection for cleanup
conn.SetSessionCode(code)
// Run autostart for this project
autostartResult := d.RunAutostart(context.Background(), metadata.ProjectPath)
resp := map[string]interface{}{
"code": code,
"autostart": autostartResult,
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleSessionUnregister handles SESSION UNREGISTER command.
// SESSION UNREGISTER <code>
func (d *Daemon) hubHandleSessionUnregister(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION UNREGISTER requires: <code>")
}
code := cmd.Args[0]
// Clean up session resources (processes, proxies) before unregistering
d.CleanupSessionResources(code)
return conn.WriteOK(fmt.Sprintf("session %s unregistered", code))
}
// hubHandleSessionHeartbeat handles SESSION HEARTBEAT command.
// SESSION HEARTBEAT <code>
func (d *Daemon) hubHandleSessionHeartbeat(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION HEARTBEAT requires: <code>")
}
code := cmd.Args[0]
if err := d.sessionRegistry.Heartbeat(code); err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
return conn.WriteOK("heartbeat received")
}
// hubHandleSessionList handles SESSION LIST command.
// SESSION LIST [-- <directory_filter_json>]
func (d *Daemon) hubHandleSessionList(conn *hubpkg.Connection, cmd *hubproto.Command) error {
var filter struct {
Directory string `json:"directory"`
Global bool `json:"global"`
}
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &filter)
}
sessions := d.sessionRegistry.List(normalizePath(filter.Directory), filter.Global)
// Convert to response format
sessionList := make([]map[string]interface{}, 0, len(sessions))
for _, s := range sessions {
sessionList = append(sessionList, s.ToJSON())
}
resp := map[string]interface{}{
"sessions": sessionList,
"count": len(sessionList),
"directory": filter.Directory,
"global": filter.Global,
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleSessionGet handles SESSION GET command.
// SESSION GET <code>
func (d *Daemon) hubHandleSessionGet(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION GET requires: <code>")
}
code := cmd.Args[0]
session, ok := d.sessionRegistry.Get(code)
if !ok {
return conn.WriteErr(hubproto.ErrNotFound, fmt.Sprintf("session %q not found", code))
}
data, _ := json.Marshal(session.ToJSON())
return conn.WriteJSON(data)
}
// hubHandleSessionSend handles SESSION SEND command.
// SESSION SEND <code> -- <message>
func (d *Daemon) hubHandleSessionSend(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION SEND requires: <code>")
}
if len(cmd.Data) == 0 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION SEND requires message data")
}
code := cmd.Args[0]
message := string(cmd.Data)
// Get session
session, ok := d.sessionRegistry.Get(code)
if !ok {
return conn.WriteErr(hubproto.ErrNotFound, fmt.Sprintf("session %q not found", code))
}
if session.GetStatus() != SessionStatusActive {
return conn.WriteErr(hubproto.ErrInvalidState, fmt.Sprintf("session %q is not active", code))
}
// Send message to overlay
if err := d.sendMessageToOverlay(session.OverlayPath, message); err != nil {
return conn.WriteErr(hubproto.ErrInternal, fmt.Sprintf("failed to send message: %v", err))
}
resp := map[string]interface{}{
"success": true,
"session_code": code,
"message_len": len(message),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleSessionSchedule handles SESSION SCHEDULE command.
// SESSION SCHEDULE <code> <duration> -- <message>
func (d *Daemon) hubHandleSessionSchedule(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 2 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION SCHEDULE requires: <code> <duration>")
}
if len(cmd.Data) == 0 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION SCHEDULE requires message data")
}
code := cmd.Args[0]
durationStr := cmd.Args[1]
message := string(cmd.Data)
// Parse duration
duration, err := time.ParseDuration(durationStr)
if err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, fmt.Sprintf("invalid duration %q: %v", durationStr, err))
}
// Get session to determine project path
session, ok := d.sessionRegistry.Get(code)
if !ok {
return conn.WriteErr(hubproto.ErrNotFound, fmt.Sprintf("session %q not found", code))
}
// Schedule the task
task, err := d.scheduler.Schedule(code, duration, message, session.ProjectPath)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, fmt.Sprintf("failed to schedule: %v", err))
}
resp := map[string]interface{}{
"task_id": task.ID,
"session_code": code,
"deliver_at": task.DeliverAt.Format(time.RFC3339),
"message_len": len(message),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleSessionCancel handles SESSION CANCEL command.
// SESSION CANCEL <task_id>
func (d *Daemon) hubHandleSessionCancel(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION CANCEL requires: <task_id>")
}
taskID := cmd.Args[0]
if err := d.scheduler.Cancel(taskID); err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
return conn.WriteOK(fmt.Sprintf("task %s cancelled", taskID))
}
// hubHandleSessionTasks handles SESSION TASKS command.
// SESSION TASKS [-- <directory_filter_json>]
func (d *Daemon) hubHandleSessionTasks(conn *hubpkg.Connection, cmd *hubproto.Command) error {
var filter struct {
Directory string `json:"directory"`
Global bool `json:"global"`
}
if len(cmd.Data) > 0 {
json.Unmarshal(cmd.Data, &filter)
}
tasks := d.scheduler.ListTasks(normalizePath(filter.Directory), filter.Global)
// Convert to response format
taskList := make([]map[string]interface{}, 0, len(tasks))
for _, t := range tasks {
taskList = append(taskList, t.ToJSON())
}
resp := map[string]interface{}{
"tasks": taskList,
"count": len(taskList),
"directory": filter.Directory,
"global": filter.Global,
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleSessionFind handles SESSION FIND command.
// SESSION FIND <directory>
func (d *Daemon) hubHandleSessionFind(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION FIND requires: <directory>")
}
directory := cmd.Args[0]
session, found := d.sessionRegistry.FindByDirectory(directory)
if !found {
return conn.WriteErr(hubproto.ErrNotFound, fmt.Sprintf("no active session found for directory %q or its parents", directory))
}
data, _ := json.Marshal(session.ToJSON())
return conn.WriteJSON(data)
}
// hubHandleSessionAttach handles SESSION ATTACH command.
// SESSION ATTACH <directory>
func (d *Daemon) hubHandleSessionAttach(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION ATTACH requires: <directory>")
}
directory := cmd.Args[0]
session, found := d.sessionRegistry.FindByDirectory(directory)
if !found {
return conn.WriteErr(hubproto.ErrNotFound, fmt.Sprintf("no active session found for directory %q or its parents", directory))
}
// Associate this connection with the session
conn.SetSessionCode(session.Code)
resp := map[string]interface{}{
"attached": true,
"session_code": session.Code,
"project_path": session.ProjectPath,
"command": session.Command,
"started_at": session.StartedAt.Format(time.RFC3339),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleSessionURL handles SESSION URL command.
// Reports a detected URL from an agnt run session, triggering proxy creation.
// SESSION URL <code> <url> -- {"script": "dev"}
func (d *Daemon) hubHandleSessionURL(conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 2 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "SESSION URL requires: <code> <url>")
}
code := cmd.Args[0]
detectedURL := cmd.Args[1]
// Get session
session, ok := d.sessionRegistry.Get(code)
if !ok {
return conn.WriteErr(hubproto.ErrNotFound, fmt.Sprintf("session %q not found", code))
}
// Parse script name from data payload (default to "dev")
scriptName := "dev"
if len(cmd.Data) > 0 {
var data struct {
Script string `json:"script"`
}
if err := json.Unmarshal(cmd.Data, &data); err == nil && data.Script != "" {
scriptName = data.Script
}
}
// Construct scriptID in the format: {basename}:{scriptName}
scriptID := makeProcessID(session.ProjectPath, scriptName)
// Send proxy event to trigger proxy creation
select {
case d.proxyEvents <- ProxyEvent{
Type: URLDetected,
ScriptID: scriptID,
URL: detectedURL,
Path: session.ProjectPath,
}:
// Event queued successfully
default:
return conn.WriteErr(hubproto.ErrInternal, "proxy event queue full")
}
resp := map[string]interface{}{
"success": true,
"session_code": code,
"url": detectedURL,
"script": scriptName,
"script_id": scriptID,
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// sendMessageToOverlay sends a message to an overlay socket.
func (d *Daemon) sendMessageToOverlay(socketPath string, message string) error {
// Create HTTP client that connects via Unix socket
client := &http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
var dialer net.Dialer
return dialer.DialContext(ctx, "unix", socketPath)
},
},
}
// Build request body
body := bytes.NewBufferString(message)
// POST to /inject endpoint
req, err := http.NewRequest("POST", "http://unix/inject", body)
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "text/plain")
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send to overlay: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("overlay returned status %d", resp.StatusCode)
}
return nil
}
// hubHandleStore handles the STORE command and its sub-verbs.
func (d *Daemon) hubHandleStore(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "STORE %s: args=%v", cmd.SubVerb, cmd.Args)
switch cmd.SubVerb {
case "GET":
return d.hubHandleStoreGet(conn, cmd)
case "SET":
return d.hubHandleStoreSet(conn, cmd)
case "DELETE":
return d.hubHandleStoreDelete(conn, cmd)
case "LIST":
return d.hubHandleStoreList(conn, cmd)
case "CLEAR":
return d.hubHandleStoreClear(conn, cmd)
case "GET-ALL":
return d.hubHandleStoreGetAll(conn, cmd)
default:
return conn.WriteStructuredErr(&hubproto.StructuredError{
Code: hubproto.ErrInvalidAction,
Message: "unknown STORE sub-command",
Command: "STORE",
ValidActions: []string{"GET", "SET", "DELETE", "LIST", "CLEAR", "GET-ALL"},
})
}
}
// hubHandleStoreGet handles STORE GET command.
func (d *Daemon) hubHandleStoreGet(conn *hubpkg.Connection, cmd *hubproto.Command) error {
var req struct {
Scope string `json:"scope"`
ScopeKey string `json:"scope_key"`
Key string `json:"key"`
}
if len(cmd.Data) > 0 {
if err := json.Unmarshal(cmd.Data, &req); err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid request JSON: "+err.Error())
}
}
if req.Scope == "" {
return conn.WriteErr(hubproto.ErrMissingParam, "scope is required")
}
if req.Key == "" {
return conn.WriteErr(hubproto.ErrMissingParam, "key is required")
}
// Get project path from session
basePath := d.getSessionProjectPath(conn)
if basePath == "" {
return conn.WriteErr(hubproto.ErrInvalidState, "no active session with project path")
}
entry, err := d.storem.Get(basePath, req.Scope, req.ScopeKey, req.Key)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
data, _ := json.Marshal(entry)
return conn.WriteJSON(data)
}
// hubHandleStoreSet handles STORE SET command.
func (d *Daemon) hubHandleStoreSet(conn *hubpkg.Connection, cmd *hubproto.Command) error {
var req struct {
Scope string `json:"scope"`
ScopeKey string `json:"scope_key"`
Key string `json:"key"`
Value interface{} `json:"value"`
Metadata map[string]any `json:"metadata,omitempty"`
}
if len(cmd.Data) > 0 {
if err := json.Unmarshal(cmd.Data, &req); err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid request JSON: "+err.Error())
}
}
if req.Scope == "" {
return conn.WriteErr(hubproto.ErrMissingParam, "scope is required")
}
if req.Key == "" {
return conn.WriteErr(hubproto.ErrMissingParam, "key is required")
}
// Get project path from session
basePath := d.getSessionProjectPath(conn)
if basePath == "" {
return conn.WriteErr(hubproto.ErrInvalidState, "no active session with project path")
}
if err := d.storem.Set(basePath, req.Scope, req.ScopeKey, req.Key, req.Value, req.Metadata); err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
return conn.WriteOK("value stored")
}
// hubHandleStoreDelete handles STORE DELETE command.
func (d *Daemon) hubHandleStoreDelete(conn *hubpkg.Connection, cmd *hubproto.Command) error {
var req struct {
Scope string `json:"scope"`
ScopeKey string `json:"scope_key"`
Key string `json:"key"`
}
if len(cmd.Data) > 0 {
if err := json.Unmarshal(cmd.Data, &req); err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid request JSON: "+err.Error())
}
}
if req.Scope == "" {
return conn.WriteErr(hubproto.ErrMissingParam, "scope is required")
}
if req.Key == "" {
return conn.WriteErr(hubproto.ErrMissingParam, "key is required")
}
// Get project path from session
basePath := d.getSessionProjectPath(conn)
if basePath == "" {
return conn.WriteErr(hubproto.ErrInvalidState, "no active session with project path")
}
if err := d.storem.Delete(basePath, req.Scope, req.ScopeKey, req.Key); err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
return conn.WriteOK("key deleted")
}
// hubHandleStoreList handles STORE LIST command.
func (d *Daemon) hubHandleStoreList(conn *hubpkg.Connection, cmd *hubproto.Command) error {
var req struct {
Scope string `json:"scope"`
ScopeKey string `json:"scope_key"`
}
if len(cmd.Data) > 0 {
if err := json.Unmarshal(cmd.Data, &req); err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid request JSON: "+err.Error())
}
}
if req.Scope == "" {
return conn.WriteErr(hubproto.ErrMissingParam, "scope is required")
}
// Get project path from session
basePath := d.getSessionProjectPath(conn)
if basePath == "" {
return conn.WriteErr(hubproto.ErrInvalidState, "no active session with project path")
}
keys, err := d.storem.List(basePath, req.Scope, req.ScopeKey)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
resp := map[string]interface{}{
"keys": keys,
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleStoreClear handles STORE CLEAR command.
func (d *Daemon) hubHandleStoreClear(conn *hubpkg.Connection, cmd *hubproto.Command) error {
var req struct {
Scope string `json:"scope"`
ScopeKey string `json:"scope_key"`
}
if len(cmd.Data) > 0 {
if err := json.Unmarshal(cmd.Data, &req); err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid request JSON: "+err.Error())
}
}
if req.Scope == "" {
return conn.WriteErr(hubproto.ErrMissingParam, "scope is required")
}
// Get project path from session
basePath := d.getSessionProjectPath(conn)
if basePath == "" {
return conn.WriteErr(hubproto.ErrInvalidState, "no active session with project path")
}
if err := d.storem.Clear(basePath, req.Scope, req.ScopeKey); err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
return conn.WriteOK("scope cleared")
}
// hubHandleStoreGetAll handles STORE GET-ALL command.
func (d *Daemon) hubHandleStoreGetAll(conn *hubpkg.Connection, cmd *hubproto.Command) error {
var req struct {
Scope string `json:"scope"`
ScopeKey string `json:"scope_key"`
}
if len(cmd.Data) > 0 {
if err := json.Unmarshal(cmd.Data, &req); err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid request JSON: "+err.Error())
}
}
if req.Scope == "" {
return conn.WriteErr(hubproto.ErrMissingParam, "scope is required")
}
// Get project path from session
basePath := d.getSessionProjectPath(conn)
if basePath == "" {
return conn.WriteErr(hubproto.ErrInvalidState, "no active session with project path")
}
entries, err := d.storem.GetAll(basePath, req.Scope, req.ScopeKey)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
resp := map[string]interface{}{
"entries": entries,
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleStatus handles the STATUS command.
// Returns full daemon info (Hub's built-in INFO only returns minimal data).
func (d *Daemon) hubHandleStatus(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "STATUS: args=%v", cmd.Args)
info := d.Info()
data, err := json.Marshal(info)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
return conn.WriteJSON(data)
}
// hubHandleAutomate handles the AUTOMATE command and its sub-verbs.
func (d *Daemon) hubHandleAutomate(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "AUTOMATE %s: args=%v", cmd.SubVerb, cmd.Args)
switch cmd.SubVerb {
case "PROCESS":
return d.hubHandleAutomateProcess(ctx, conn, cmd)
case "BATCH":
return d.hubHandleAutomateBatch(ctx, conn, cmd)
default:
return conn.WriteStructuredErr(&hubproto.StructuredError{
Code: hubproto.ErrInvalidAction,
Message: "unknown AUTOMATE sub-command",
Command: "AUTOMATE",
ValidActions: []string{"PROCESS", "BATCH"},
})
}
}
// getOrCreateAutomator returns the automation processor, creating it on first use.
func (d *Daemon) getOrCreateAutomator() (*automation.Processor, error) {
if d.automator != nil {
return d.automator, nil
}
// Create automation processor with default config
proc, err := automation.New(automation.DefaultConfig())
if err != nil {
return nil, fmt.Errorf("failed to create automation processor: %w", err)
}
d.automator = proc
return d.automator, nil
}
// hubHandleAutomateProcess handles AUTOMATE PROCESS command.
// AUTOMATE PROCESS -- <json_task>
func (d *Daemon) hubHandleAutomateProcess(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Data) == 0 {
return conn.WriteErr(hubproto.ErrMissingParam, "task data required")
}
// Parse the task request
var req struct {
Type string `json:"type"`
Data map[string]interface{} `json:"data"`
Context map[string]interface{} `json:"context"`
Options struct {
Model string `json:"model,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"`
Temperature float64 `json:"temperature,omitempty"`
} `json:"options,omitempty"`
}
if err := json.Unmarshal(cmd.Data, &req); err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid task JSON: "+err.Error())
}
if req.Type == "" {
return conn.WriteErr(hubproto.ErrMissingParam, "task type required")
}
// Get or create the automation processor
proc, err := d.getOrCreateAutomator()
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
// Create the task
task := automation.Task{
Type: automation.TaskType(req.Type),
Input: req.Data,
Context: req.Context,
Options: automation.TaskOptions{
Model: req.Options.Model,
MaxTokens: req.Options.MaxTokens,
Temperature: req.Options.Temperature,
},
}
// Process the task
startTime := time.Now()
result, err := proc.Process(ctx, task)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
// Build response
resp := map[string]interface{}{
"success": result.Error == nil,
"duration": time.Since(startTime).String(),
}
if result.Error != nil {
resp["error"] = result.Error.Error()
} else {
resp["result"] = result.Output
}
resp["tokens_used"] = result.Tokens
resp["cost_usd"] = result.Cost
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleAutomateBatch handles AUTOMATE BATCH command.
// AUTOMATE BATCH -- <json_tasks>
func (d *Daemon) hubHandleAutomateBatch(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Data) == 0 {
return conn.WriteErr(hubproto.ErrMissingParam, "tasks data required")
}
// Parse the batch request
var req struct {
Tasks []struct {
Type string `json:"type"`
Data map[string]interface{} `json:"data"`
Context map[string]interface{} `json:"context"`
Options struct {
Model string `json:"model,omitempty"`
MaxTokens int `json:"max_tokens,omitempty"`
Temperature float64 `json:"temperature,omitempty"`
} `json:"options,omitempty"`
} `json:"tasks"`
}
if err := json.Unmarshal(cmd.Data, &req); err != nil {
return conn.WriteErr(hubproto.ErrInvalidArgs, "invalid batch JSON: "+err.Error())
}
if len(req.Tasks) == 0 {
return conn.WriteErr(hubproto.ErrMissingParam, "at least one task required")
}
// Get or create the automation processor
proc, err := d.getOrCreateAutomator()
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
// Convert to automation tasks
tasks := make([]automation.Task, len(req.Tasks))
for i, t := range req.Tasks {
tasks[i] = automation.Task{
Type: automation.TaskType(t.Type),
Input: t.Data,
Context: t.Context,
Options: automation.TaskOptions{
Model: t.Options.Model,
MaxTokens: t.Options.MaxTokens,
Temperature: t.Options.Temperature,
},
}
}
// Process the batch
startTime := time.Now()
results, err := proc.ProcessBatch(ctx, tasks)
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, err.Error())
}
// Build response
resultList := make([]map[string]interface{}, len(results))
var totalTokens int
var totalCost float64
var successCount, failCount int
for i, result := range results {
r := map[string]interface{}{
"index": i,
"success": result != nil && result.Error == nil,
}
if result != nil {
if result.Error != nil {
r["error"] = result.Error.Error()
failCount++
} else {
r["result"] = result.Output
successCount++
}
r["tokens_used"] = result.Tokens
r["cost_usd"] = result.Cost
r["duration"] = result.Duration.String()
totalTokens += result.Tokens
totalCost += result.Cost
} else {
r["error"] = "no result"
failCount++
}
resultList[i] = r
}
resp := map[string]interface{}{
"results": resultList,
"total": len(results),
"succeeded": successCount,
"failed": failCount,
"total_tokens": totalTokens,
"total_cost": totalCost,
"duration": time.Since(startTime).String(),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleStopAll handles the STOP-ALL command.
// Stops all running processes, proxies, and tunnels without shutting down the daemon.
func (d *Daemon) hubHandleStopAll(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "STOP-ALL: args=%v", cmd.Args)
// Count resources before stopping
procsBefore := len(d.hub.ProcessManager().List())
proxiesBefore := len(d.proxym.List())
tunnelsBefore := d.tunnelm.ActiveCount()
// Stop all resources
d.StopAllResources(ctx)
resp := map[string]interface{}{
"success": true,
"processes_stopped": procsBefore,
"proxies_stopped": proxiesBefore,
"tunnels_stopped": tunnelsBefore,
"message": fmt.Sprintf("Stopped %d processes, %d proxies, %d tunnels", procsBefore, proxiesBefore, tunnelsBefore),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleRestartAll handles the RESTART-ALL command.
// Stops all resources and restarts them with the same configuration.
func (d *Daemon) hubHandleRestartAll(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
debug.Log("daemon", "RESTART-ALL: args=%v", cmd.Args)
// Capture running resources before stop
runningProcs := d.hub.ProcessManager().List()
runningProxies := d.proxym.List()
// Build restart manifests from running resources
type procManifest struct {
ID string
Command string
Args []string
ProjectPath string
}
type proxyManifest struct {
ID string
TargetURL string
Port int
MaxLogSize int
ProjectPath string
BindAddress string
}
var procsToRestart []procManifest
var proxiesToRestart []proxyManifest
for _, p := range runningProcs {
if p.State().String() == "running" {
procsToRestart = append(procsToRestart, procManifest{
ID: p.ID,
Command: p.Command,
Args: p.Args,
ProjectPath: p.ProjectPath,
})
}
}
for _, p := range runningProxies {
if p.IsRunning() {
proxiesToRestart = append(proxiesToRestart, proxyManifest{
ID: p.ID,
TargetURL: p.TargetURL.String(),
Port: 0, // Will use auto-port
MaxLogSize: int(p.Logger().Stats().MaxSize),
ProjectPath: p.Path,
BindAddress: p.BindAddress,
})
}
}
// Stop all resources
d.StopAllResources(ctx)
// Wait a moment for cleanup
time.Sleep(100 * time.Millisecond)
// Restart processes
var procsRestarted, procsFailed int
var proxyRestarted, proxyFailed int
for _, pm := range procsToRestart {
_, err := d.hub.ProcessManager().StartOrReuse(ctx, process.ProcessConfig{
ID: pm.ID,
ProjectPath: pm.ProjectPath,
Command: pm.Command,
Args: pm.Args,
})
if err != nil {
log.Printf("[RESTART-ALL] Failed to restart process %s: %v", pm.ID, err)
procsFailed++
} else {
procsRestarted++
}
}
// Restart proxies
for _, pm := range proxiesToRestart {
_, err := d.proxym.Create(ctx, proxy.ProxyConfig{
ID: pm.ID,
TargetURL: pm.TargetURL,
ListenPort: pm.Port,
MaxLogSize: pm.MaxLogSize,
Path: pm.ProjectPath,
BindAddress: pm.BindAddress,
})
if err != nil {
log.Printf("[RESTART-ALL] Failed to restart proxy %s: %v", pm.ID, err)
proxyFailed++
} else {
proxyRestarted++
}
}
resp := map[string]interface{}{
"success": true,
"processes_restarted": procsRestarted,
"processes_failed": procsFailed,
"proxies_restarted": proxyRestarted,
"proxies_failed": proxyFailed,
"message": fmt.Sprintf("Restarted %d/%d processes, %d/%d proxies",
procsRestarted, len(procsToRestart), proxyRestarted, len(proxiesToRestart)),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleProcRestart handles PROC RESTART <id>.
// Stops a process and restarts it with the same configuration.
// If a rogue process is using the expected port, it will be killed first.
func (d *Daemon) hubHandleProcRestart(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrMissingParam, "process_id required")
}
processID := cmd.Args[0]
// Get the process to capture its config
proc, err := d.hub.ProcessManager().Get(processID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, fmt.Sprintf("process %q not found", processID))
}
// Capture config before stopping
command := proc.Command
args := proc.Args
projectPath := proc.ProjectPath
// Check if process is in a restartable state
state := proc.State().String()
if state != "running" && state != "stopped" && state != "failed" {
return conn.WriteErr(hubproto.ErrInvalidState, fmt.Sprintf("process %q is in state %s, cannot restart", processID, state))
}
// Stop the process if running
if state == "running" {
stopCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
if err := d.hub.ProcessManager().Stop(stopCtx, processID); err != nil {
log.Printf("[PROC RESTART] Warning: error stopping process %s: %v", processID, err)
}
// Wait for process to fully stop
time.Sleep(100 * time.Millisecond)
}
// Detect expected port and clean up any rogue processes using it
expectedPort := d.getExpectedPortForProcess(proc)
var killedPIDs []int
if expectedPort > 0 {
killedPIDs, err = d.preflightPortCleanup(ctx, expectedPort)
if err != nil {
log.Printf("[PROC RESTART] Warning: port cleanup failed for port %d: %v", expectedPort, err)
} else if len(killedPIDs) > 0 {
log.Printf("[PROC RESTART] Killed rogue process(es) on port %d: %v", expectedPort, killedPIDs)
}
}
// Remove the old process registration
d.hub.ProcessManager().RemoveByPath(processID, projectPath)
// Start the process with the same config
result, err := d.hub.ProcessManager().StartOrReuse(ctx, process.ProcessConfig{
ID: processID,
ProjectPath: projectPath,
Command: command,
Args: args,
})
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, fmt.Sprintf("failed to restart: %v", err))
}
newProc := result.Process
resp := map[string]interface{}{
"id": processID,
"process_id": processID,
"command": command,
"args": args,
"project_path": projectPath,
"state": newProc.State().String(),
"pid": newProc.PID(),
"restarted": true,
"success": true,
"message": fmt.Sprintf("Process %q restarted successfully", processID),
}
// Include info about killed rogue processes
if len(killedPIDs) > 0 {
resp["rogue_processes_killed"] = killedPIDs
resp["port_cleaned"] = expectedPort
resp["message"] = fmt.Sprintf("Process %q restarted successfully (killed rogue process(es) on port %d)", processID, expectedPort)
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}
// hubHandleProxyRestart handles PROXY RESTART <id>.
// Stops a proxy and restarts it with the same configuration.
func (d *Daemon) hubHandleProxyRestart(ctx context.Context, conn *hubpkg.Connection, cmd *hubproto.Command) error {
if len(cmd.Args) < 1 {
return conn.WriteErr(hubproto.ErrInvalidArgs, "PROXY RESTART requires: <id>")
}
proxyID := cmd.Args[0]
// Get the proxy to capture its config
p, err := d.getSessionScopedProxy(conn, proxyID)
if err != nil {
return conn.WriteErr(hubproto.ErrNotFound, err.Error())
}
// Capture config before stopping
targetURL := p.TargetURL.String()
maxLogSize := int(p.Logger().Stats().MaxSize)
projectPath := p.Path
bindAddress := p.BindAddress
// Stop the proxy
if err := d.proxym.Stop(ctx, proxyID); err != nil {
log.Printf("[PROXY RESTART] Warning: error stopping proxy %s: %v", proxyID, err)
}
// Remove from persisted state
if d.stateMgr != nil {
d.stateMgr.RemoveProxy(proxyID)
}
// Wait for cleanup
time.Sleep(100 * time.Millisecond)
// Create new proxy with same config
newProxy, err := d.proxym.Create(ctx, proxy.ProxyConfig{
ID: proxyID,
TargetURL: targetURL,
ListenPort: 0, // Auto-assign port
MaxLogSize: maxLogSize,
Path: projectPath,
BindAddress: bindAddress,
})
if err != nil {
return conn.WriteErr(hubproto.ErrInternal, fmt.Sprintf("failed to restart proxy: %v", err))
}
// Persist the new proxy state
if d.stateMgr != nil {
d.stateMgr.AddProxy(PersistentProxyConfig{
ID: proxyID,
TargetURL: targetURL,
Port: 0, // Auto-assigned
MaxLogSize: maxLogSize,
Path: projectPath,
})
}
resp := map[string]interface{}{
"id": proxyID,
"target_url": targetURL,
"listen_addr": newProxy.ListenAddr,
"restarted": true,
"success": true,
"message": fmt.Sprintf("Proxy %q restarted successfully", proxyID),
}
data, _ := json.Marshal(resp)
return conn.WriteJSON(data)
}